tycho_client/feed/
synchronizer.rs

1use std::{collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6use tokio::{
7    select,
8    sync::{
9        mpsc::{channel, error::SendError, Receiver, Sender},
10        oneshot,
11    },
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        BlockChanges, BlockParam, Chain, ComponentTvlRequestBody, EntryPointWithTracingParams,
19        ExtractorIdentity, ProtocolComponent, ResponseAccount, ResponseProtocolState,
20        TracingResult, VersionParam,
21    },
22    Bytes,
23};
24
25use crate::{
26    deltas::{DeltasClient, SubscriptionOptions},
27    feed::{
28        component_tracker::{ComponentFilter, ComponentTracker},
29        BlockHeader, HeaderLike,
30    },
31    rpc::{RPCClient, RPCError},
32    DeltasError,
33};
34
35#[derive(Error, Debug)]
36pub enum SynchronizerError {
37    /// RPC client failures.
38    #[error("RPC error: {0}")]
39    RPCError(#[from] RPCError),
40
41    /// Issues with the main channel
42    #[error("{0}")]
43    ChannelError(String),
44
45    /// Timeout elapsed errors.
46    #[error("Timeout error: {0}")]
47    Timeout(String),
48
49    /// Failed to close the synchronizer.
50    #[error("Failed to close synchronizer: {0}")]
51    CloseError(String),
52
53    /// Server connection failures or interruptions.
54    #[error("Connection error: {0}")]
55    ConnectionError(String),
56
57    /// Connection closed
58    #[error("Connection closed")]
59    ConnectionClosed,
60}
61
62pub type SyncResult<T> = Result<T, SynchronizerError>;
63
64impl<T> From<SendError<T>> for SynchronizerError {
65    fn from(err: SendError<T>) -> Self {
66        SynchronizerError::ChannelError(format!("Failed to send message: {err}"))
67    }
68}
69
70impl From<DeltasError> for SynchronizerError {
71    fn from(err: DeltasError) -> Self {
72        match err {
73            DeltasError::NotConnected => SynchronizerError::ConnectionClosed,
74            _ => SynchronizerError::ConnectionError(err.to_string()),
75        }
76    }
77}
78
79pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
80    extractor_id: ExtractorIdentity,
81    retrieve_balances: bool,
82    rpc_client: R,
83    deltas_client: D,
84    max_retries: u64,
85    retry_cooldown: Duration,
86    include_snapshots: bool,
87    component_tracker: ComponentTracker<R>,
88    last_synced_block: Option<BlockHeader>,
89    timeout: u64,
90    include_tvl: bool,
91}
92
93#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
94pub struct ComponentWithState {
95    pub state: ResponseProtocolState,
96    pub component: ProtocolComponent,
97    pub component_tvl: Option<f64>,
98    pub entrypoints: Vec<(EntryPointWithTracingParams, TracingResult)>,
99}
100
101#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
102pub struct Snapshot {
103    pub states: HashMap<String, ComponentWithState>,
104    pub vm_storage: HashMap<Bytes, ResponseAccount>,
105}
106
107impl Snapshot {
108    fn extend(&mut self, other: Snapshot) {
109        self.states.extend(other.states);
110        self.vm_storage.extend(other.vm_storage);
111    }
112
113    pub fn get_states(&self) -> &HashMap<String, ComponentWithState> {
114        &self.states
115    }
116
117    pub fn get_vm_storage(&self) -> &HashMap<Bytes, ResponseAccount> {
118        &self.vm_storage
119    }
120}
121
122#[derive(Clone, PartialEq, Debug, Default, Serialize, Deserialize)]
123pub struct StateSyncMessage<H>
124where
125    H: HeaderLike,
126{
127    /// The block information for this update.
128    pub header: H,
129    /// Snapshot for new components.
130    pub snapshots: Snapshot,
131    /// A single delta contains state updates for all tracked components, as well as additional
132    /// information about the system components e.g. newly added components (even below tvl), tvl
133    /// updates, balance updates.
134    pub deltas: Option<BlockChanges>,
135    /// Components that stopped being tracked.
136    pub removed_components: HashMap<String, ProtocolComponent>,
137}
138
139impl<H> StateSyncMessage<H>
140where
141    H: HeaderLike,
142{
143    pub fn merge(mut self, other: Self) -> Self {
144        // be careful with removed and snapshots attributes here, these can be ambiguous.
145        self.removed_components
146            .retain(|k, _| !other.snapshots.states.contains_key(k));
147        self.snapshots
148            .states
149            .retain(|k, _| !other.removed_components.contains_key(k));
150
151        self.snapshots.extend(other.snapshots);
152        let deltas = match (self.deltas, other.deltas) {
153            (Some(l), Some(r)) => Some(l.merge(r)),
154            (None, Some(r)) => Some(r),
155            (Some(l), None) => Some(l),
156            (None, None) => None,
157        };
158        self.removed_components
159            .extend(other.removed_components);
160        Self {
161            header: other.header,
162            snapshots: self.snapshots,
163            deltas,
164            removed_components: self.removed_components,
165        }
166    }
167}
168
169/// Handle for controlling a running synchronizer task.
170///
171/// This handle provides methods to gracefully shut down the synchronizer
172/// and await its completion with a timeout.
173pub struct SynchronizerTaskHandle {
174    join_handle: JoinHandle<()>,
175    close_tx: oneshot::Sender<()>,
176}
177
178/// StateSynchronizer
179///
180/// Used to synchronize the state of a single protocol. The synchronizer is responsible for
181/// delivering messages to the client that let him reconstruct subsets of the protocol state.
182///
183/// This involves deciding which components to track according to the clients preferences,
184/// retrieving & emitting snapshots of components which the client has not seen yet and subsequently
185/// delivering delta messages for the components that have changed.
186impl SynchronizerTaskHandle {
187    pub fn new(join_handle: JoinHandle<()>, close_tx: oneshot::Sender<()>) -> Self {
188        Self { join_handle, close_tx }
189    }
190
191    /// Splits the handle into its join handle and close sender.
192    ///
193    /// This allows monitoring the task completion separately from controlling shutdown.
194    /// The join handle can be used with FuturesUnordered for monitoring, while the
195    /// close sender can be used to signal graceful shutdown.
196    pub fn split(self) -> (JoinHandle<()>, oneshot::Sender<()>) {
197        (self.join_handle, self.close_tx)
198    }
199}
200
201#[async_trait]
202pub trait StateSynchronizer: Send + Sync + 'static {
203    async fn initialize(&mut self) -> SyncResult<()>;
204    /// Starts the state synchronization, consuming the synchronizer.
205    /// Returns a handle for controlling the running task and a receiver for messages.
206    async fn start(
207        mut self,
208    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>);
209}
210
211impl<R, D> ProtocolStateSynchronizer<R, D>
212where
213    // TODO: Consider moving these constraints directly to the
214    // client...
215    R: RPCClient + Clone + Send + Sync + 'static,
216    D: DeltasClient + Clone + Send + Sync + 'static,
217{
218    /// Creates a new state synchronizer.
219    #[allow(clippy::too_many_arguments)]
220    pub fn new(
221        extractor_id: ExtractorIdentity,
222        retrieve_balances: bool,
223        component_filter: ComponentFilter,
224        max_retries: u64,
225        retry_cooldown: Duration,
226        include_snapshots: bool,
227        include_tvl: bool,
228        rpc_client: R,
229        deltas_client: D,
230        timeout: u64,
231    ) -> Self {
232        Self {
233            extractor_id: extractor_id.clone(),
234            retrieve_balances,
235            rpc_client: rpc_client.clone(),
236            include_snapshots,
237            deltas_client,
238            component_tracker: ComponentTracker::new(
239                extractor_id.chain,
240                extractor_id.name.as_str(),
241                component_filter,
242                rpc_client,
243            ),
244            max_retries,
245            retry_cooldown,
246            last_synced_block: None,
247            timeout,
248            include_tvl,
249        }
250    }
251
252    /// Retrieves state snapshots of the requested components
253    #[allow(deprecated)]
254    async fn get_snapshots<'a, I: IntoIterator<Item = &'a String>>(
255        &mut self,
256        header: BlockHeader,
257        ids: Option<I>,
258    ) -> SyncResult<StateSyncMessage<BlockHeader>> {
259        if !self.include_snapshots {
260            return Ok(StateSyncMessage { header, ..Default::default() });
261        }
262        let version = VersionParam::new(
263            None,
264            Some(BlockParam {
265                chain: Some(self.extractor_id.chain),
266                hash: None,
267                number: Some(header.number as i64),
268            }),
269        );
270
271        // Use given ids or use all if not passed
272        let component_ids: Vec<_> = match ids {
273            Some(ids) => ids.into_iter().cloned().collect(),
274            None => self
275                .component_tracker
276                .get_tracked_component_ids(),
277        };
278
279        if component_ids.is_empty() {
280            return Ok(StateSyncMessage { header, ..Default::default() });
281        }
282
283        let component_tvl = if self.include_tvl {
284            let body = ComponentTvlRequestBody::id_filtered(
285                component_ids.clone(),
286                self.extractor_id.chain,
287            );
288            self.rpc_client
289                .get_component_tvl_paginated(&body, 100, 4)
290                .await?
291                .tvl
292        } else {
293            HashMap::new()
294        };
295
296        //TODO: Improve this, we should not query for every component, but only for the ones that
297        // could have entrypoints. Maybe apply a filter per protocol?
298        let entrypoints_result = if self.extractor_id.chain == Chain::Ethereum {
299            // Fetch entrypoints
300            let result = self
301                .rpc_client
302                .get_traced_entry_points_paginated(
303                    self.extractor_id.chain,
304                    &self.extractor_id.name,
305                    &component_ids,
306                    100,
307                    4,
308                )
309                .await?;
310            self.component_tracker
311                .process_entrypoints(&result.clone().into());
312            Some(result)
313        } else {
314            None
315        };
316
317        // Fetch protocol states
318        let mut protocol_states = self
319            .rpc_client
320            .get_protocol_states_paginated(
321                self.extractor_id.chain,
322                &component_ids,
323                &self.extractor_id.name,
324                self.retrieve_balances,
325                &version,
326                100,
327                4,
328            )
329            .await?
330            .states
331            .into_iter()
332            .map(|state| (state.component_id.clone(), state))
333            .collect::<HashMap<_, _>>();
334
335        trace!(states=?&protocol_states, "Retrieved ProtocolStates");
336        let states = self
337            .component_tracker
338            .components
339            .values()
340            .filter_map(|component| {
341                if let Some(state) = protocol_states.remove(&component.id) {
342                    Some((
343                        component.id.clone(),
344                        ComponentWithState {
345                            state,
346                            component: component.clone(),
347                            component_tvl: component_tvl
348                                .get(&component.id)
349                                .cloned(),
350                            entrypoints: entrypoints_result
351                                .as_ref()
352                                .map(|r| {
353                                    r.traced_entry_points
354                                        .get(&component.id)
355                                        .cloned()
356                                        .unwrap_or_default()
357                                })
358                                .unwrap_or_default(),
359                        },
360                    ))
361                } else if component_ids.contains(&component.id) {
362                    // only emit error event if we requested this component
363                    let component_id = &component.id;
364                    error!(?component_id, "Missing state for native component!");
365                    None
366                } else {
367                    None
368                }
369            })
370            .collect();
371
372        // Fetch contract states
373        let contract_ids = self
374            .component_tracker
375            .get_contracts_by_component(&component_ids);
376        let vm_storage = if !contract_ids.is_empty() {
377            let ids: Vec<Bytes> = contract_ids
378                .clone()
379                .into_iter()
380                .collect();
381            let contract_states = self
382                .rpc_client
383                .get_contract_state_paginated(
384                    self.extractor_id.chain,
385                    ids.as_slice(),
386                    &self.extractor_id.name,
387                    &version,
388                    100,
389                    4,
390                )
391                .await?
392                .accounts
393                .into_iter()
394                .map(|acc| (acc.address.clone(), acc))
395                .collect::<HashMap<_, _>>();
396
397            trace!(states=?&contract_states, "Retrieved ContractState");
398
399            let contract_address_to_components = self
400                .component_tracker
401                .components
402                .iter()
403                .filter_map(|(id, comp)| {
404                    if component_ids.contains(id) {
405                        Some(
406                            comp.contract_ids
407                                .iter()
408                                .map(|address| (address.clone(), comp.id.clone())),
409                        )
410                    } else {
411                        None
412                    }
413                })
414                .flatten()
415                .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
416                    acc.entry(addr).or_default().push(c_id);
417                    acc
418                });
419
420            contract_ids
421                .iter()
422                .filter_map(|address| {
423                    if let Some(state) = contract_states.get(address) {
424                        Some((address.clone(), state.clone()))
425                    } else if let Some(ids) = contract_address_to_components.get(address) {
426                        // only emit error even if we did actually request this address
427                        error!(
428                            ?address,
429                            ?ids,
430                            "Component with lacking contract storage encountered!"
431                        );
432                        None
433                    } else {
434                        None
435                    }
436                })
437                .collect()
438        } else {
439            HashMap::new()
440        };
441
442        Ok(StateSyncMessage {
443            header,
444            snapshots: Snapshot { states, vm_storage },
445            deltas: None,
446            removed_components: HashMap::new(),
447        })
448    }
449
450    /// Main method that does all the work.
451    ///
452    /// ## Return Value
453    ///
454    /// Returns a `Result` where:
455    /// - `Ok(())` - Synchronization completed successfully (usually due to close signal)
456    /// - `Err((error, None))` - Error occurred AND close signal was received (don't retry)
457    /// - `Err((error, Some(end_rx)))` - Error occurred but close signal was NOT received (can
458    ///   retry)
459    ///
460    /// The returned `end_rx` (if any) should be reused for retry attempts since the close
461    /// signal may still arrive and we want to remain cancellable across retries.
462    #[instrument(skip(self, block_tx, end_rx), fields(extractor_id = %self.extractor_id))]
463    async fn state_sync(
464        &mut self,
465        block_tx: &mut Sender<SyncResult<StateSyncMessage<BlockHeader>>>,
466        mut end_rx: oneshot::Receiver<()>,
467    ) -> Result<(), (SynchronizerError, Option<oneshot::Receiver<()>>)> {
468        // initialisation
469        let subscription_options = SubscriptionOptions::new().with_state(self.include_snapshots);
470        let (subscription_id, mut msg_rx) = match self
471            .deltas_client
472            .subscribe(self.extractor_id.clone(), subscription_options)
473            .await
474        {
475            Ok(result) => result,
476            Err(e) => return Err((e.into(), Some(end_rx))),
477        };
478
479        let result = async {
480            info!("Waiting for deltas...");
481            let mut warned = false;
482            let mut first_msg = loop {
483                let msg = select! {
484                    deltas_result = timeout(Duration::from_secs(self.timeout), msg_rx.recv()) => {
485                        deltas_result
486                            .map_err(|_| {
487                                SynchronizerError::Timeout(format!(
488                                    "First deltas took longer than {t}s to arrive",
489                                    t = self.timeout
490                                ))
491                            })?
492                            .ok_or_else(|| {
493                                SynchronizerError::ConnectionError(
494                                    "Deltas channel closed before first message".to_string(),
495                                )
496                            })?
497                    },
498                    _ = &mut end_rx => {
499                        info!("Received close signal while waiting for first deltas");
500                        return Ok(());
501                    }
502                };
503
504                let incoming = BlockHeader::from_block(msg.get_block(), msg.is_revert());
505                if let Some(current) = &self.last_synced_block {
506                    if current.number >= incoming.number && !self.is_next_expected(&incoming) {
507                        if !warned {
508                            info!(extractor=%self.extractor_id, from=incoming.number, to=current.number, "Syncing. Skipping messages");
509                            warned = true;
510                        }
511                        continue
512                    }
513                }
514                break msg;
515            };
516
517            self.filter_deltas(&mut first_msg);
518
519            // initial snapshot
520            let block = first_msg.get_block().clone();
521            info!(height = &block.number, "First deltas received");
522            let header = BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert());
523            let deltas_msg = StateSyncMessage {
524                header: BlockHeader::from_block(first_msg.get_block(), first_msg.is_revert()),
525                snapshots: Default::default(),
526                deltas: Some(first_msg),
527                removed_components: Default::default(),
528            };
529
530            // If possible skip retrieving snapshots
531            let msg = if !self.is_next_expected(&header) {
532                info!("Retrieving snapshot");
533                let snapshot = self
534                    .get_snapshots::<Vec<&String>>(
535                        BlockHeader::from_block(&block, false),
536                        None,
537                    )
538                    .await?
539                    .merge(deltas_msg);
540                let n_components = self.component_tracker.components.len();
541                let n_snapshots = snapshot.snapshots.states.len();
542                info!(n_components, n_snapshots, "Initial snapshot retrieved, starting delta message feed");
543                snapshot
544            } else {
545                deltas_msg
546            };
547            block_tx.send(Ok(msg)).await?;
548            self.last_synced_block = Some(header.clone());
549            loop {
550                select! {
551                    deltas_opt = msg_rx.recv() => {
552                        if let Some(mut deltas) = deltas_opt {
553                            let header = BlockHeader::from_block(deltas.get_block(), deltas.is_revert());
554                            debug!(block_number=?header.number, "Received delta message");
555
556                            let (snapshots, removed_components) = {
557                                // 1. Remove components based on latest changes
558                                // 2. Add components based on latest changes, query those for snapshots
559                                let (to_add, to_remove) = self.component_tracker.filter_updated_components(&deltas);
560
561                                // Only components we don't track yet need a snapshot,
562                                let requiring_snapshot: Vec<_> = to_add
563                                    .iter()
564                                    .filter(|id| {
565                                        !self.component_tracker
566                                            .components
567                                            .contains_key(id.as_str())
568                                    })
569                                    .collect();
570                                debug!(components=?requiring_snapshot, "SnapshotRequest");
571                                self.component_tracker
572                                    .start_tracking(requiring_snapshot.as_slice())
573                                    .await?;
574                                let snapshots = self
575                                    .get_snapshots(header.clone(), Some(requiring_snapshot))
576                                    .await?
577                                    .snapshots;
578
579                                let removed_components = if !to_remove.is_empty() {
580                                    self.component_tracker.stop_tracking(&to_remove)
581                                } else {
582                                    Default::default()
583                                };
584
585                                (snapshots, removed_components)
586                            };
587
588                            // 3. Update entrypoints on the tracker (affects which contracts are tracked)
589                            self.component_tracker.process_entrypoints(&deltas.dci_update);
590
591                            // 4. Filter deltas by currently tracked components / contracts
592                            self.filter_deltas(&mut deltas);
593                            let n_changes = deltas.n_changes();
594
595                            // 5. Send the message
596                            let next = StateSyncMessage {
597                                header: header.clone(),
598                                snapshots,
599                                deltas: Some(deltas),
600                                removed_components,
601                            };
602                            block_tx.send(Ok(next)).await?;
603                            self.last_synced_block = Some(header.clone());
604
605                            debug!(block_number=?header.number, n_changes, "Finished processing delta message");
606                        } else {
607                            return Err(SynchronizerError::ConnectionError("Deltas channel closed".to_string()));
608                        }
609                    },
610                    _ = &mut end_rx => {
611                        info!("Received close signal during state_sync");
612                        return Ok(());
613                    }
614                }
615            }
616        }.await;
617
618        // This cleanup code now runs regardless of how the function exits (error or channel close)
619        warn!(last_synced_block = ?&self.last_synced_block, "Deltas processing ended.");
620        //Ignore error
621        let _ = self
622            .deltas_client
623            .unsubscribe(subscription_id)
624            .await
625            .map_err(|err| {
626                warn!(err=?err, "Unsubscribing from deltas on cleanup failed!");
627            });
628
629        // Handle the result: if it succeeded, we're done. If it errored, we need to determine
630        // whether the end_rx was consumed (close signal received) or not
631        match result {
632            Ok(()) => Ok(()), // Success, likely due to close signal
633            Err(e) => {
634                // The error came from the inner async block. Since the async block
635                // can receive close signals (which would return Ok), any error means
636                // the close signal was NOT received, so we can return the end_rx for retry
637                Err((e, Some(end_rx)))
638            }
639        }
640    }
641
642    fn is_next_expected(&self, incoming: &BlockHeader) -> bool {
643        if let Some(block) = self.last_synced_block.as_ref() {
644            return incoming.parent_hash == block.hash;
645        }
646        false
647    }
648    fn filter_deltas(&self, deltas: &mut BlockChanges) {
649        deltas.filter_by_component(|id| {
650            self.component_tracker
651                .components
652                .contains_key(id)
653        });
654        deltas.filter_by_contract(|id| {
655            self.component_tracker
656                .contracts
657                .contains(id)
658        });
659    }
660}
661
662#[async_trait]
663impl<R, D> StateSynchronizer for ProtocolStateSynchronizer<R, D>
664where
665    R: RPCClient + Clone + Send + Sync + 'static,
666    D: DeltasClient + Clone + Send + Sync + 'static,
667{
668    async fn initialize(&mut self) -> SyncResult<()> {
669        info!("Retrieving relevant protocol components");
670        self.component_tracker
671            .initialise_components()
672            .await?;
673        info!(
674            n_components = self.component_tracker.components.len(),
675            n_contracts = self.component_tracker.contracts.len(),
676            "Finished retrieving components",
677        );
678
679        Ok(())
680    }
681
682    async fn start(
683        mut self,
684    ) -> (SynchronizerTaskHandle, Receiver<SyncResult<StateSyncMessage<BlockHeader>>>) {
685        let (mut tx, rx) = channel(15);
686        let (end_tx, end_rx) = oneshot::channel::<()>();
687
688        let jh = tokio::spawn(async move {
689            let mut retry_count = 0;
690            let mut current_end_rx = end_rx;
691            let mut final_error = None;
692
693            while retry_count < self.max_retries {
694                info!(extractor_id=%&self.extractor_id, retry_count, "(Re)starting synchronization loop");
695
696                let res = self
697                    .state_sync(&mut tx, current_end_rx)
698                    .await;
699                match res {
700                    Ok(()) => {
701                        info!(
702                            extractor_id=%&self.extractor_id,
703                            retry_count,
704                            "State synchronization exited cleanly"
705                        );
706                        return;
707                    }
708                    Err((e, maybe_end_rx)) => {
709                        warn!(
710                            extractor_id=%&self.extractor_id,
711                            retry_count,
712                            error=%e,
713                            "State synchronization errored!"
714                        );
715
716                        // If we have the end_rx back, we can retry
717                        if let Some(recovered_end_rx) = maybe_end_rx {
718                            current_end_rx = recovered_end_rx;
719
720                            if let SynchronizerError::ConnectionClosed = e {
721                                // break synchronization loop if websocket client is dead
722                                error!(
723                                    "Websocket connection closed. State synchronization exiting."
724                                );
725                                let _ = tx.send(Err(e)).await;
726                                return;
727                            } else {
728                                // Store error in case this is our last retry
729                                final_error = Some(e);
730                            }
731                        } else {
732                            // Close signal was received, exit cleanly
733                            info!(extractor_id=%&self.extractor_id, "Received close signal, exiting.");
734                            return;
735                        }
736                    }
737                }
738                sleep(self.retry_cooldown).await;
739                retry_count += 1;
740            }
741            if let Some(e) = final_error {
742                warn!(extractor_id=%&self.extractor_id, retry_count, error=%e, "Max retries exceeded");
743                let _ = tx.send(Err(e)).await;
744            }
745        });
746
747        let handle = SynchronizerTaskHandle::new(jh, end_tx);
748        (handle, rx)
749    }
750}
751
752#[cfg(test)]
753mod test {
754    //! Test suite for ProtocolStateSynchronizer shutdown and cleanup behavior.
755    //!
756    //! ## Test Coverage Strategy:
757    //!
758    //! ### Shutdown & Close Signal Tests:
759    //! - `test_public_close_api_functionality` - Tests public API (start/close lifecycle)
760    //! - `test_close_signal_while_waiting_for_first_deltas` - Close during initial wait
761    //! - `test_close_signal_during_main_processing_loop` - Close during main processing
762    //!
763    //! ### Cleanup & Error Handling Tests:
764    //! - `test_cleanup_runs_when_state_sync_processing_errors` - Cleanup on processing errors
765    //!
766    //! ### Coverage Summary:
767    //! These tests ensure cleanup code (shared state reset + unsubscribe) runs on ALL exit paths:
768    //! ✓ Close signal before first deltas   ✓ Close signal during processing
769    //! ✓ Processing errors                  ✓ Channel closure
770    //! ✓ Public API close operations        ✓ Normal completion
771
772    use std::{collections::HashSet, sync::Arc};
773
774    use test_log::test;
775    use tycho_common::dto::{
776        AddressStorageLocation, Block, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
777        DCIUpdate, EntryPoint, PaginationResponse, ProtocolComponentRequestResponse,
778        ProtocolComponentsRequestBody, ProtocolStateRequestBody, ProtocolStateRequestResponse,
779        ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse, RPCTracerParams,
780        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
781        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, TracingParams,
782    };
783    use uuid::Uuid;
784
785    use super::*;
786    use crate::{deltas::MockDeltasClient, rpc::MockRPCClient, DeltasError, RPCError};
787
788    // Required for mock client to implement clone
789    struct ArcRPCClient<T>(Arc<T>);
790
791    // Default derive(Clone) does require T to be Clone as well.
792    impl<T> Clone for ArcRPCClient<T> {
793        fn clone(&self) -> Self {
794            ArcRPCClient(self.0.clone())
795        }
796    }
797
798    #[async_trait]
799    impl<T> RPCClient for ArcRPCClient<T>
800    where
801        T: RPCClient + Sync + Send + 'static,
802    {
803        async fn get_tokens(
804            &self,
805            request: &TokensRequestBody,
806        ) -> Result<TokensRequestResponse, RPCError> {
807            self.0.get_tokens(request).await
808        }
809
810        async fn get_contract_state(
811            &self,
812            request: &StateRequestBody,
813        ) -> Result<StateRequestResponse, RPCError> {
814            self.0.get_contract_state(request).await
815        }
816
817        async fn get_protocol_components(
818            &self,
819            request: &ProtocolComponentsRequestBody,
820        ) -> Result<ProtocolComponentRequestResponse, RPCError> {
821            self.0
822                .get_protocol_components(request)
823                .await
824        }
825
826        async fn get_protocol_states(
827            &self,
828            request: &ProtocolStateRequestBody,
829        ) -> Result<ProtocolStateRequestResponse, RPCError> {
830            self.0
831                .get_protocol_states(request)
832                .await
833        }
834
835        async fn get_protocol_systems(
836            &self,
837            request: &ProtocolSystemsRequestBody,
838        ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
839            self.0
840                .get_protocol_systems(request)
841                .await
842        }
843
844        async fn get_component_tvl(
845            &self,
846            request: &ComponentTvlRequestBody,
847        ) -> Result<ComponentTvlRequestResponse, RPCError> {
848            self.0.get_component_tvl(request).await
849        }
850
851        async fn get_traced_entry_points(
852            &self,
853            request: &TracedEntryPointRequestBody,
854        ) -> Result<TracedEntryPointRequestResponse, RPCError> {
855            self.0
856                .get_traced_entry_points(request)
857                .await
858        }
859    }
860
861    // Required for mock client to implement clone
862    struct ArcDeltasClient<T>(Arc<T>);
863
864    // Default derive(Clone) does require T to be Clone as well.
865    impl<T> Clone for ArcDeltasClient<T> {
866        fn clone(&self) -> Self {
867            ArcDeltasClient(self.0.clone())
868        }
869    }
870
871    #[async_trait]
872    impl<T> DeltasClient for ArcDeltasClient<T>
873    where
874        T: DeltasClient + Sync + Send + 'static,
875    {
876        async fn subscribe(
877            &self,
878            extractor_id: ExtractorIdentity,
879            options: SubscriptionOptions,
880        ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
881            self.0
882                .subscribe(extractor_id, options)
883                .await
884        }
885
886        async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
887            self.0
888                .unsubscribe(subscription_id)
889                .await
890        }
891
892        async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
893            self.0.connect().await
894        }
895
896        async fn close(&self) -> Result<(), DeltasError> {
897            self.0.close().await
898        }
899    }
900
901    fn with_mocked_clients(
902        native: bool,
903        include_tvl: bool,
904        rpc_client: Option<MockRPCClient>,
905        deltas_client: Option<MockDeltasClient>,
906    ) -> ProtocolStateSynchronizer<ArcRPCClient<MockRPCClient>, ArcDeltasClient<MockDeltasClient>>
907    {
908        let rpc_client = ArcRPCClient(Arc::new(rpc_client.unwrap_or_default()));
909        let deltas_client = ArcDeltasClient(Arc::new(deltas_client.unwrap_or_default()));
910
911        ProtocolStateSynchronizer::new(
912            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
913            native,
914            ComponentFilter::with_tvl_range(50.0, 50.0),
915            1,
916            Duration::from_secs(0),
917            true,
918            include_tvl,
919            rpc_client,
920            deltas_client,
921            10_u64,
922        )
923    }
924
925    fn state_snapshot_native() -> ProtocolStateRequestResponse {
926        ProtocolStateRequestResponse {
927            states: vec![ResponseProtocolState {
928                component_id: "Component1".to_string(),
929                ..Default::default()
930            }],
931            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
932        }
933    }
934
935    fn component_tvl_snapshot() -> ComponentTvlRequestResponse {
936        let tvl = HashMap::from([("Component1".to_string(), 100.0)]);
937
938        ComponentTvlRequestResponse {
939            tvl,
940            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
941        }
942    }
943
944    #[test(tokio::test)]
945    async fn test_get_snapshots_native() {
946        let header = BlockHeader::default();
947        let mut rpc = MockRPCClient::new();
948        rpc.expect_get_protocol_states()
949            .returning(|_| Ok(state_snapshot_native()));
950        rpc.expect_get_traced_entry_points()
951            .returning(|_| {
952                Ok(TracedEntryPointRequestResponse {
953                    traced_entry_points: HashMap::new(),
954                    pagination: PaginationResponse::new(0, 20, 0),
955                })
956            });
957        let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
958        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
959        state_sync
960            .component_tracker
961            .components
962            .insert("Component1".to_string(), component.clone());
963        let components_arg = ["Component1".to_string()];
964        let exp = StateSyncMessage {
965            header: header.clone(),
966            snapshots: Snapshot {
967                states: state_snapshot_native()
968                    .states
969                    .into_iter()
970                    .map(|state| {
971                        (
972                            state.component_id.clone(),
973                            ComponentWithState {
974                                state,
975                                component: component.clone(),
976                                entrypoints: vec![],
977                                component_tvl: None,
978                            },
979                        )
980                    })
981                    .collect(),
982                vm_storage: HashMap::new(),
983            },
984            deltas: None,
985            removed_components: Default::default(),
986        };
987
988        let snap = state_sync
989            .get_snapshots(header, Some(&components_arg))
990            .await
991            .expect("Retrieving snapshot failed");
992
993        assert_eq!(snap, exp);
994    }
995
996    #[test(tokio::test)]
997    async fn test_get_snapshots_native_with_tvl() {
998        let header = BlockHeader::default();
999        let mut rpc = MockRPCClient::new();
1000        rpc.expect_get_protocol_states()
1001            .returning(|_| Ok(state_snapshot_native()));
1002        rpc.expect_get_component_tvl()
1003            .returning(|_| Ok(component_tvl_snapshot()));
1004        rpc.expect_get_traced_entry_points()
1005            .returning(|_| {
1006                Ok(TracedEntryPointRequestResponse {
1007                    traced_entry_points: HashMap::new(),
1008                    pagination: PaginationResponse::new(0, 20, 0),
1009                })
1010            });
1011        let mut state_sync = with_mocked_clients(true, true, Some(rpc), None);
1012        let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };
1013        state_sync
1014            .component_tracker
1015            .components
1016            .insert("Component1".to_string(), component.clone());
1017        let components_arg = ["Component1".to_string()];
1018        let exp = StateSyncMessage {
1019            header: header.clone(),
1020            snapshots: Snapshot {
1021                states: state_snapshot_native()
1022                    .states
1023                    .into_iter()
1024                    .map(|state| {
1025                        (
1026                            state.component_id.clone(),
1027                            ComponentWithState {
1028                                state,
1029                                component: component.clone(),
1030                                component_tvl: Some(100.0),
1031                                entrypoints: vec![],
1032                            },
1033                        )
1034                    })
1035                    .collect(),
1036                vm_storage: HashMap::new(),
1037            },
1038            deltas: None,
1039            removed_components: Default::default(),
1040        };
1041
1042        let snap = state_sync
1043            .get_snapshots(header, Some(&components_arg))
1044            .await
1045            .expect("Retrieving snapshot failed");
1046
1047        assert_eq!(snap, exp);
1048    }
1049
1050    fn state_snapshot_vm() -> StateRequestResponse {
1051        StateRequestResponse {
1052            accounts: vec![
1053                ResponseAccount { address: Bytes::from("0x0badc0ffee"), ..Default::default() },
1054                ResponseAccount { address: Bytes::from("0xbabe42"), ..Default::default() },
1055            ],
1056            pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1057        }
1058    }
1059
1060    fn traced_entry_point_response() -> TracedEntryPointRequestResponse {
1061        TracedEntryPointRequestResponse {
1062            traced_entry_points: HashMap::from([(
1063                "Component1".to_string(),
1064                vec![(
1065                    EntryPointWithTracingParams {
1066                        entry_point: EntryPoint {
1067                            external_id: "entrypoint_a".to_string(),
1068                            target: Bytes::from("0x0badc0ffee"),
1069                            signature: "sig()".to_string(),
1070                        },
1071                        params: TracingParams::RPCTracer(RPCTracerParams {
1072                            caller: Some(Bytes::from("0x0badc0ffee")),
1073                            calldata: Bytes::from("0x0badc0ffee"),
1074                            state_overrides: None,
1075                            prune_addresses: None,
1076                        }),
1077                    },
1078                    TracingResult {
1079                        retriggers: HashSet::from([(
1080                            Bytes::from("0x0badc0ffee"),
1081                            AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1082                        )]),
1083                        accessed_slots: HashMap::from([(
1084                            Bytes::from("0x0badc0ffee"),
1085                            HashSet::from([Bytes::from("0xbadbeef0")]),
1086                        )]),
1087                    },
1088                )],
1089            )]),
1090            pagination: PaginationResponse::new(0, 20, 0),
1091        }
1092    }
1093
1094    #[test(tokio::test)]
1095    async fn test_get_snapshots_vm() {
1096        let header = BlockHeader::default();
1097        let mut rpc = MockRPCClient::new();
1098        rpc.expect_get_protocol_states()
1099            .returning(|_| Ok(state_snapshot_native()));
1100        rpc.expect_get_contract_state()
1101            .returning(|_| Ok(state_snapshot_vm()));
1102        rpc.expect_get_traced_entry_points()
1103            .returning(|_| Ok(traced_entry_point_response()));
1104        let mut state_sync = with_mocked_clients(false, false, Some(rpc), None);
1105        let component = ProtocolComponent {
1106            id: "Component1".to_string(),
1107            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1108            ..Default::default()
1109        };
1110        state_sync
1111            .component_tracker
1112            .components
1113            .insert("Component1".to_string(), component.clone());
1114        let components_arg = ["Component1".to_string()];
1115        let exp = StateSyncMessage {
1116            header: header.clone(),
1117            snapshots: Snapshot {
1118                states: [(
1119                    component.id.clone(),
1120                    ComponentWithState {
1121                        state: ResponseProtocolState {
1122                            component_id: "Component1".to_string(),
1123                            ..Default::default()
1124                        },
1125                        component: component.clone(),
1126                        component_tvl: None,
1127                        entrypoints: vec![(
1128                            EntryPointWithTracingParams {
1129                                entry_point: EntryPoint {
1130                                    external_id: "entrypoint_a".to_string(),
1131                                    target: Bytes::from("0x0badc0ffee"),
1132                                    signature: "sig()".to_string(),
1133                                },
1134                                params: TracingParams::RPCTracer(RPCTracerParams {
1135                                    caller: Some(Bytes::from("0x0badc0ffee")),
1136                                    calldata: Bytes::from("0x0badc0ffee"),
1137                                    state_overrides: None,
1138                                    prune_addresses: None,
1139                                }),
1140                            },
1141                            TracingResult {
1142                                retriggers: HashSet::from([(
1143                                    Bytes::from("0x0badc0ffee"),
1144                                    AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1145                                )]),
1146                                accessed_slots: HashMap::from([(
1147                                    Bytes::from("0x0badc0ffee"),
1148                                    HashSet::from([Bytes::from("0xbadbeef0")]),
1149                                )]),
1150                            },
1151                        )],
1152                    },
1153                )]
1154                .into_iter()
1155                .collect(),
1156                vm_storage: state_snapshot_vm()
1157                    .accounts
1158                    .into_iter()
1159                    .map(|state| (state.address.clone(), state))
1160                    .collect(),
1161            },
1162            deltas: None,
1163            removed_components: Default::default(),
1164        };
1165
1166        let snap = state_sync
1167            .get_snapshots(header, Some(&components_arg))
1168            .await
1169            .expect("Retrieving snapshot failed");
1170
1171        assert_eq!(snap, exp);
1172    }
1173
1174    #[test(tokio::test)]
1175    async fn test_get_snapshots_vm_with_tvl() {
1176        let header = BlockHeader::default();
1177        let mut rpc = MockRPCClient::new();
1178        rpc.expect_get_protocol_states()
1179            .returning(|_| Ok(state_snapshot_native()));
1180        rpc.expect_get_contract_state()
1181            .returning(|_| Ok(state_snapshot_vm()));
1182        rpc.expect_get_component_tvl()
1183            .returning(|_| Ok(component_tvl_snapshot()));
1184        rpc.expect_get_traced_entry_points()
1185            .returning(|_| {
1186                Ok(TracedEntryPointRequestResponse {
1187                    traced_entry_points: HashMap::new(),
1188                    pagination: PaginationResponse::new(0, 20, 0),
1189                })
1190            });
1191        let mut state_sync = with_mocked_clients(false, true, Some(rpc), None);
1192        let component = ProtocolComponent {
1193            id: "Component1".to_string(),
1194            contract_ids: vec![Bytes::from("0x0badc0ffee"), Bytes::from("0xbabe42")],
1195            ..Default::default()
1196        };
1197        state_sync
1198            .component_tracker
1199            .components
1200            .insert("Component1".to_string(), component.clone());
1201        let components_arg = ["Component1".to_string()];
1202        let exp = StateSyncMessage {
1203            header: header.clone(),
1204            snapshots: Snapshot {
1205                states: [(
1206                    component.id.clone(),
1207                    ComponentWithState {
1208                        state: ResponseProtocolState {
1209                            component_id: "Component1".to_string(),
1210                            ..Default::default()
1211                        },
1212                        component: component.clone(),
1213                        component_tvl: Some(100.0),
1214                        entrypoints: vec![],
1215                    },
1216                )]
1217                .into_iter()
1218                .collect(),
1219                vm_storage: state_snapshot_vm()
1220                    .accounts
1221                    .into_iter()
1222                    .map(|state| (state.address.clone(), state))
1223                    .collect(),
1224            },
1225            deltas: None,
1226            removed_components: Default::default(),
1227        };
1228
1229        let snap = state_sync
1230            .get_snapshots(header, Some(&components_arg))
1231            .await
1232            .expect("Retrieving snapshot failed");
1233
1234        assert_eq!(snap, exp);
1235    }
1236
1237    fn mock_clients_for_state_sync() -> (MockRPCClient, MockDeltasClient, Sender<BlockChanges>) {
1238        let mut rpc_client = MockRPCClient::new();
1239        // Mocks for the start_tracking call, these need to come first because they are more
1240        // specific, see: https://docs.rs/mockall/latest/mockall/#matching-multiple-calls
1241        rpc_client
1242            .expect_get_protocol_components()
1243            .with(mockall::predicate::function(
1244                move |request_params: &ProtocolComponentsRequestBody| {
1245                    if let Some(ids) = request_params.component_ids.as_ref() {
1246                        ids.contains(&"Component3".to_string())
1247                    } else {
1248                        false
1249                    }
1250                },
1251            ))
1252            .returning(|_| {
1253                // return Component3
1254                Ok(ProtocolComponentRequestResponse {
1255                    protocol_components: vec![
1256                        // this component shall have a tvl update above threshold
1257                        ProtocolComponent { id: "Component3".to_string(), ..Default::default() },
1258                    ],
1259                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1260                })
1261            });
1262        rpc_client
1263            .expect_get_protocol_states()
1264            .with(mockall::predicate::function(move |request_params: &ProtocolStateRequestBody| {
1265                let expected_id = "Component3".to_string();
1266                if let Some(ids) = request_params.protocol_ids.as_ref() {
1267                    ids.contains(&expected_id)
1268                } else {
1269                    false
1270                }
1271            }))
1272            .returning(|_| {
1273                // return Component3 state
1274                Ok(ProtocolStateRequestResponse {
1275                    states: vec![ResponseProtocolState {
1276                        component_id: "Component3".to_string(),
1277                        ..Default::default()
1278                    }],
1279                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1280                })
1281            });
1282
1283        // mock calls for the initial state snapshots
1284        rpc_client
1285            .expect_get_protocol_components()
1286            .returning(|_| {
1287                // Initial sync of components
1288                Ok(ProtocolComponentRequestResponse {
1289                    protocol_components: vec![
1290                        // this component shall have a tvl update above threshold
1291                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1292                        // this component shall have a tvl update below threshold.
1293                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1294                        // a third component will have a tvl update above threshold
1295                    ],
1296                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1297                })
1298            });
1299        rpc_client
1300            .expect_get_protocol_states()
1301            .returning(|_| {
1302                // Initial state snapshot
1303                Ok(ProtocolStateRequestResponse {
1304                    states: vec![
1305                        ResponseProtocolState {
1306                            component_id: "Component1".to_string(),
1307                            ..Default::default()
1308                        },
1309                        ResponseProtocolState {
1310                            component_id: "Component2".to_string(),
1311                            ..Default::default()
1312                        },
1313                    ],
1314                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1315                })
1316            });
1317        rpc_client
1318            .expect_get_component_tvl()
1319            .returning(|_| {
1320                Ok(ComponentTvlRequestResponse {
1321                    tvl: [
1322                        ("Component1".to_string(), 100.0),
1323                        ("Component2".to_string(), 0.0),
1324                        ("Component3".to_string(), 1000.0),
1325                    ]
1326                    .into_iter()
1327                    .collect(),
1328                    pagination: PaginationResponse { page: 0, page_size: 20, total: 3 },
1329                })
1330            });
1331        rpc_client
1332            .expect_get_traced_entry_points()
1333            .returning(|_| {
1334                Ok(TracedEntryPointRequestResponse {
1335                    traced_entry_points: HashMap::new(),
1336                    pagination: PaginationResponse::new(0, 20, 0),
1337                })
1338            });
1339
1340        // Mock deltas client and messages
1341        let mut deltas_client = MockDeltasClient::new();
1342        let (tx, rx) = channel(1);
1343        deltas_client
1344            .expect_subscribe()
1345            .return_once(move |_, _| {
1346                // Return subscriber id and a channel
1347                Ok((Uuid::default(), rx))
1348            });
1349
1350        // Expect unsubscribe call during cleanup
1351        deltas_client
1352            .expect_unsubscribe()
1353            .return_once(|_| Ok(()));
1354
1355        (rpc_client, deltas_client, tx)
1356    }
1357
1358    /// Test strategy
1359    ///
1360    /// - initial snapshot retrieval returns two component1 and component2 as snapshots
1361    /// - send 2 dummy messages, containing only blocks
1362    /// - third message contains a new component with some significant tvl, one initial component
1363    ///   slips below tvl threshold, another one is above tvl but does not get re-requested.
1364    #[test(tokio::test)]
1365    async fn test_state_sync() {
1366        let (rpc_client, deltas_client, tx) = mock_clients_for_state_sync();
1367        let deltas = [
1368            BlockChanges {
1369                extractor: "uniswap-v2".to_string(),
1370                chain: Chain::Ethereum,
1371                block: Block {
1372                    number: 1,
1373                    hash: Bytes::from("0x01"),
1374                    parent_hash: Bytes::from("0x00"),
1375                    chain: Chain::Ethereum,
1376                    ts: Default::default(),
1377                },
1378                revert: false,
1379                dci_update: DCIUpdate {
1380                    new_entrypoints: HashMap::from([(
1381                        "Component1".to_string(),
1382                        HashSet::from([EntryPoint {
1383                            external_id: "entrypoint_a".to_string(),
1384                            target: Bytes::from("0x0badc0ffee"),
1385                            signature: "sig()".to_string(),
1386                        }]),
1387                    )]),
1388                    new_entrypoint_params: HashMap::from([(
1389                        "entrypoint_a".to_string(),
1390                        HashSet::from([(
1391                            TracingParams::RPCTracer(RPCTracerParams {
1392                                caller: Some(Bytes::from("0x0badc0ffee")),
1393                                calldata: Bytes::from("0x0badc0ffee"),
1394                                state_overrides: None,
1395                                prune_addresses: None,
1396                            }),
1397                            Some("Component1".to_string()),
1398                        )]),
1399                    )]),
1400                    trace_results: HashMap::from([(
1401                        "entrypoint_a".to_string(),
1402                        TracingResult {
1403                            retriggers: HashSet::from([(
1404                                Bytes::from("0x0badc0ffee"),
1405                                AddressStorageLocation::new(Bytes::from("0x0badc0ffee"), 12),
1406                            )]),
1407                            accessed_slots: HashMap::from([(
1408                                Bytes::from("0x0badc0ffee"),
1409                                HashSet::from([Bytes::from("0xbadbeef0")]),
1410                            )]),
1411                        },
1412                    )]),
1413                },
1414                ..Default::default()
1415            },
1416            BlockChanges {
1417                extractor: "uniswap-v2".to_string(),
1418                chain: Chain::Ethereum,
1419                block: Block {
1420                    number: 2,
1421                    hash: Bytes::from("0x02"),
1422                    parent_hash: Bytes::from("0x01"),
1423                    chain: Chain::Ethereum,
1424                    ts: Default::default(),
1425                },
1426                revert: false,
1427                component_tvl: [
1428                    ("Component1".to_string(), 100.0),
1429                    ("Component2".to_string(), 0.0),
1430                    ("Component3".to_string(), 1000.0),
1431                ]
1432                .into_iter()
1433                .collect(),
1434                ..Default::default()
1435            },
1436        ];
1437        let mut state_sync = with_mocked_clients(true, true, Some(rpc_client), Some(deltas_client));
1438        state_sync
1439            .initialize()
1440            .await
1441            .expect("Init failed");
1442
1443        // Test starts here
1444        let (handle, mut rx) = state_sync.start().await;
1445        let (jh, close_tx) = handle.split();
1446        tx.send(deltas[0].clone())
1447            .await
1448            .expect("deltas channel msg 0 closed!");
1449        let first_msg = timeout(Duration::from_millis(100), rx.recv())
1450            .await
1451            .expect("waiting for first state msg timed out!")
1452            .expect("state sync block sender closed!");
1453        tx.send(deltas[1].clone())
1454            .await
1455            .expect("deltas channel msg 1 closed!");
1456        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1457            .await
1458            .expect("waiting for second state msg timed out!")
1459            .expect("state sync block sender closed!");
1460        let _ = close_tx.send(());
1461        jh.await
1462            .expect("state sync task panicked!");
1463
1464        // assertions
1465        let exp1 = StateSyncMessage {
1466            header: BlockHeader {
1467                number: 1,
1468                hash: Bytes::from("0x01"),
1469                parent_hash: Bytes::from("0x00"),
1470                revert: false,
1471                ..Default::default()
1472            },
1473            snapshots: Snapshot {
1474                states: [
1475                    (
1476                        "Component1".to_string(),
1477                        ComponentWithState {
1478                            state: ResponseProtocolState {
1479                                component_id: "Component1".to_string(),
1480                                ..Default::default()
1481                            },
1482                            component: ProtocolComponent {
1483                                id: "Component1".to_string(),
1484                                ..Default::default()
1485                            },
1486                            component_tvl: Some(100.0),
1487                            entrypoints: vec![],
1488                        },
1489                    ),
1490                    (
1491                        "Component2".to_string(),
1492                        ComponentWithState {
1493                            state: ResponseProtocolState {
1494                                component_id: "Component2".to_string(),
1495                                ..Default::default()
1496                            },
1497                            component: ProtocolComponent {
1498                                id: "Component2".to_string(),
1499                                ..Default::default()
1500                            },
1501                            component_tvl: Some(0.0),
1502                            entrypoints: vec![],
1503                        },
1504                    ),
1505                ]
1506                .into_iter()
1507                .collect(),
1508                vm_storage: HashMap::new(),
1509            },
1510            deltas: Some(deltas[0].clone()),
1511            removed_components: Default::default(),
1512        };
1513
1514        let exp2 = StateSyncMessage {
1515            header: BlockHeader {
1516                number: 2,
1517                hash: Bytes::from("0x02"),
1518                parent_hash: Bytes::from("0x01"),
1519                revert: false,
1520                ..Default::default()
1521            },
1522            snapshots: Snapshot {
1523                states: [
1524                    // This is the new component we queried once it passed the tvl threshold.
1525                    (
1526                        "Component3".to_string(),
1527                        ComponentWithState {
1528                            state: ResponseProtocolState {
1529                                component_id: "Component3".to_string(),
1530                                ..Default::default()
1531                            },
1532                            component: ProtocolComponent {
1533                                id: "Component3".to_string(),
1534                                ..Default::default()
1535                            },
1536                            component_tvl: Some(1000.0),
1537                            entrypoints: vec![],
1538                        },
1539                    ),
1540                ]
1541                .into_iter()
1542                .collect(),
1543                vm_storage: HashMap::new(),
1544            },
1545            // Our deltas are empty and since merge methods are
1546            // tested in tycho-common we don't have much to do here.
1547            deltas: Some(BlockChanges {
1548                extractor: "uniswap-v2".to_string(),
1549                chain: Chain::Ethereum,
1550                block: Block {
1551                    number: 2,
1552                    hash: Bytes::from("0x02"),
1553                    parent_hash: Bytes::from("0x01"),
1554                    chain: Chain::Ethereum,
1555                    ts: Default::default(),
1556                },
1557                revert: false,
1558                component_tvl: [
1559                    // "Component2" should not show here.
1560                    ("Component1".to_string(), 100.0),
1561                    ("Component3".to_string(), 1000.0),
1562                ]
1563                .into_iter()
1564                .collect(),
1565                ..Default::default()
1566            }),
1567            // "Component2" was removed, because its tvl changed to 0.
1568            removed_components: [(
1569                "Component2".to_string(),
1570                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1571            )]
1572            .into_iter()
1573            .collect(),
1574        };
1575        assert_eq!(first_msg.unwrap(), exp1);
1576        assert_eq!(second_msg.unwrap(), exp2);
1577    }
1578
1579    #[test(tokio::test)]
1580    async fn test_state_sync_with_tvl_range() {
1581        // Define the range for testing
1582        let remove_tvl_threshold = 5.0;
1583        let add_tvl_threshold = 7.0;
1584
1585        let mut rpc_client = MockRPCClient::new();
1586        let mut deltas_client = MockDeltasClient::new();
1587
1588        rpc_client
1589            .expect_get_protocol_components()
1590            .with(mockall::predicate::function(
1591                move |request_params: &ProtocolComponentsRequestBody| {
1592                    if let Some(ids) = request_params.component_ids.as_ref() {
1593                        ids.contains(&"Component3".to_string())
1594                    } else {
1595                        false
1596                    }
1597                },
1598            ))
1599            .returning(|_| {
1600                Ok(ProtocolComponentRequestResponse {
1601                    protocol_components: vec![ProtocolComponent {
1602                        id: "Component3".to_string(),
1603                        ..Default::default()
1604                    }],
1605                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1606                })
1607            });
1608        rpc_client
1609            .expect_get_protocol_states()
1610            .with(mockall::predicate::function(move |request_params: &ProtocolStateRequestBody| {
1611                let expected_id = "Component3".to_string();
1612                if let Some(ids) = request_params.protocol_ids.as_ref() {
1613                    ids.contains(&expected_id)
1614                } else {
1615                    false
1616                }
1617            }))
1618            .returning(|_| {
1619                Ok(ProtocolStateRequestResponse {
1620                    states: vec![ResponseProtocolState {
1621                        component_id: "Component3".to_string(),
1622                        ..Default::default()
1623                    }],
1624                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1625                })
1626            });
1627
1628        // Mock for the initial snapshot retrieval
1629        rpc_client
1630            .expect_get_protocol_components()
1631            .returning(|_| {
1632                Ok(ProtocolComponentRequestResponse {
1633                    protocol_components: vec![
1634                        ProtocolComponent { id: "Component1".to_string(), ..Default::default() },
1635                        ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1636                    ],
1637                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1638                })
1639            });
1640        rpc_client
1641            .expect_get_protocol_states()
1642            .returning(|_| {
1643                Ok(ProtocolStateRequestResponse {
1644                    states: vec![
1645                        ResponseProtocolState {
1646                            component_id: "Component1".to_string(),
1647                            ..Default::default()
1648                        },
1649                        ResponseProtocolState {
1650                            component_id: "Component2".to_string(),
1651                            ..Default::default()
1652                        },
1653                    ],
1654                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
1655                })
1656            });
1657        rpc_client
1658            .expect_get_traced_entry_points()
1659            .returning(|_| {
1660                Ok(TracedEntryPointRequestResponse {
1661                    traced_entry_points: HashMap::new(),
1662                    pagination: PaginationResponse::new(0, 20, 0),
1663                })
1664            });
1665
1666        rpc_client
1667            .expect_get_component_tvl()
1668            .returning(|_| {
1669                Ok(ComponentTvlRequestResponse {
1670                    tvl: [
1671                        ("Component1".to_string(), 6.0),
1672                        ("Component2".to_string(), 2.0),
1673                        ("Component3".to_string(), 10.0),
1674                    ]
1675                    .into_iter()
1676                    .collect(),
1677                    pagination: PaginationResponse { page: 0, page_size: 20, total: 3 },
1678                })
1679            });
1680
1681        rpc_client
1682            .expect_get_component_tvl()
1683            .returning(|_| {
1684                Ok(ComponentTvlRequestResponse {
1685                    tvl: [
1686                        ("Component1".to_string(), 6.0),
1687                        ("Component2".to_string(), 2.0),
1688                        ("Component3".to_string(), 10.0),
1689                    ]
1690                    .into_iter()
1691                    .collect(),
1692                    pagination: PaginationResponse { page: 0, page_size: 20, total: 3 },
1693                })
1694            });
1695
1696        let (tx, rx) = channel(1);
1697        deltas_client
1698            .expect_subscribe()
1699            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1700
1701        // Expect unsubscribe call during cleanup
1702        deltas_client
1703            .expect_unsubscribe()
1704            .return_once(|_| Ok(()));
1705
1706        let mut state_sync = ProtocolStateSynchronizer::new(
1707            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
1708            true,
1709            ComponentFilter::with_tvl_range(remove_tvl_threshold, add_tvl_threshold),
1710            1,
1711            Duration::from_secs(0),
1712            true,
1713            true,
1714            ArcRPCClient(Arc::new(rpc_client)),
1715            ArcDeltasClient(Arc::new(deltas_client)),
1716            10_u64,
1717        );
1718        state_sync
1719            .initialize()
1720            .await
1721            .expect("Init failed");
1722
1723        // Simulate the incoming BlockChanges
1724        let deltas = [
1725            BlockChanges {
1726                extractor: "uniswap-v2".to_string(),
1727                chain: Chain::Ethereum,
1728                block: Block {
1729                    number: 1,
1730                    hash: Bytes::from("0x01"),
1731                    parent_hash: Bytes::from("0x00"),
1732                    chain: Chain::Ethereum,
1733                    ts: Default::default(),
1734                },
1735                revert: false,
1736                ..Default::default()
1737            },
1738            BlockChanges {
1739                extractor: "uniswap-v2".to_string(),
1740                chain: Chain::Ethereum,
1741                block: Block {
1742                    number: 2,
1743                    hash: Bytes::from("0x02"),
1744                    parent_hash: Bytes::from("0x01"),
1745                    chain: Chain::Ethereum,
1746                    ts: Default::default(),
1747                },
1748                revert: false,
1749                component_tvl: [
1750                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1751                    ("Component2".to_string(), 2.0), // Below lower threshold, should be removed
1752                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1753                ]
1754                .into_iter()
1755                .collect(),
1756                ..Default::default()
1757            },
1758        ];
1759
1760        let (handle, mut rx) = state_sync.start().await;
1761        let (jh, close_tx) = handle.split();
1762
1763        // Simulate sending delta messages
1764        tx.send(deltas[0].clone())
1765            .await
1766            .expect("deltas channel msg 0 closed!");
1767
1768        // Expecting to receive the initial state message
1769        let _ = timeout(Duration::from_millis(100), rx.recv())
1770            .await
1771            .expect("waiting for first state msg timed out!")
1772            .expect("state sync block sender closed!");
1773
1774        // Send the third message, which should trigger TVL-based changes
1775        tx.send(deltas[1].clone())
1776            .await
1777            .expect("deltas channel msg 1 closed!");
1778        let second_msg = timeout(Duration::from_millis(100), rx.recv())
1779            .await
1780            .expect("waiting for second state msg timed out!")
1781            .expect("state sync block sender closed!")
1782            .expect("no error");
1783
1784        let _ = close_tx.send(());
1785        jh.await
1786            .expect("state sync task panicked!");
1787
1788        let expected_second_msg = StateSyncMessage {
1789            header: BlockHeader {
1790                number: 2,
1791                hash: Bytes::from("0x02"),
1792                parent_hash: Bytes::from("0x01"),
1793                revert: false,
1794                ..Default::default()
1795            },
1796            snapshots: Snapshot {
1797                states: [(
1798                    "Component3".to_string(),
1799                    ComponentWithState {
1800                        state: ResponseProtocolState {
1801                            component_id: "Component3".to_string(),
1802                            ..Default::default()
1803                        },
1804                        component: ProtocolComponent {
1805                            id: "Component3".to_string(),
1806                            ..Default::default()
1807                        },
1808                        component_tvl: Some(10.0),
1809                        entrypoints: vec![], // TODO: add entrypoints?
1810                    },
1811                )]
1812                .into_iter()
1813                .collect(),
1814                vm_storage: HashMap::new(),
1815            },
1816            deltas: Some(BlockChanges {
1817                extractor: "uniswap-v2".to_string(),
1818                chain: Chain::Ethereum,
1819                block: Block {
1820                    number: 2,
1821                    hash: Bytes::from("0x02"),
1822                    parent_hash: Bytes::from("0x01"),
1823                    chain: Chain::Ethereum,
1824                    ts: Default::default(),
1825                },
1826                revert: false,
1827                component_tvl: [
1828                    ("Component1".to_string(), 6.0), // Within range, should not trigger changes
1829                    ("Component3".to_string(), 10.0), // Above upper threshold, should be added
1830                ]
1831                .into_iter()
1832                .collect(),
1833                ..Default::default()
1834            }),
1835            removed_components: [(
1836                "Component2".to_string(),
1837                ProtocolComponent { id: "Component2".to_string(), ..Default::default() },
1838            )]
1839            .into_iter()
1840            .collect(),
1841        };
1842
1843        assert_eq!(second_msg, expected_second_msg);
1844    }
1845
1846    #[test(tokio::test)]
1847    async fn test_public_close_api_functionality() {
1848        // Tests the public close() API through the StateSynchronizer trait:
1849        // - close() fails before start() is called
1850        // - close() succeeds while synchronizer is running
1851        // - close() fails after already closed
1852        // This tests the full start/close lifecycle via the public API
1853
1854        let mut rpc_client = MockRPCClient::new();
1855        let mut deltas_client = MockDeltasClient::new();
1856
1857        // Mock the initial components call
1858        rpc_client
1859            .expect_get_protocol_components()
1860            .returning(|_| {
1861                Ok(ProtocolComponentRequestResponse {
1862                    protocol_components: vec![],
1863                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1864                })
1865            });
1866
1867        // Set up deltas client that will wait for messages (blocking in state_sync)
1868        let (_tx, rx) = channel(1);
1869        deltas_client
1870            .expect_subscribe()
1871            .return_once(move |_, _| Ok((Uuid::default(), rx)));
1872
1873        // Expect unsubscribe call during cleanup
1874        deltas_client
1875            .expect_unsubscribe()
1876            .return_once(|_| Ok(()));
1877
1878        let mut state_sync = ProtocolStateSynchronizer::new(
1879            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1880            true,
1881            ComponentFilter::with_tvl_range(0.0, 0.0),
1882            5, // Enough retries
1883            Duration::from_secs(0),
1884            true,
1885            false,
1886            ArcRPCClient(Arc::new(rpc_client)),
1887            ArcDeltasClient(Arc::new(deltas_client)),
1888            10000_u64, // Long timeout so task doesn't exit on its own
1889        );
1890
1891        state_sync
1892            .initialize()
1893            .await
1894            .expect("Init should succeed");
1895
1896        // Start the synchronizer and test the new split-based close mechanism
1897        let (handle, _rx) = state_sync.start().await;
1898        let (jh, close_tx) = handle.split();
1899
1900        // Give it time to start up and enter state_sync
1901        tokio::time::sleep(Duration::from_millis(100)).await;
1902
1903        // Send close signal should succeed
1904        close_tx
1905            .send(())
1906            .expect("Should be able to send close signal");
1907        // Task should stop cleanly
1908        jh.await.expect("Task should not panic");
1909    }
1910
1911    #[test(tokio::test)]
1912    async fn test_cleanup_runs_when_state_sync_processing_errors() {
1913        // Tests that cleanup code runs when state_sync() errors during delta processing.
1914        // Specifically tests: RPC errors during snapshot retrieval cause proper cleanup.
1915        // Verifies: shared.last_synced_block reset + subscription unsubscribe on errors
1916
1917        let mut rpc_client = MockRPCClient::new();
1918        let mut deltas_client = MockDeltasClient::new();
1919
1920        // Mock the initial components call
1921        rpc_client
1922            .expect_get_protocol_components()
1923            .returning(|_| {
1924                Ok(ProtocolComponentRequestResponse {
1925                    protocol_components: vec![],
1926                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
1927                })
1928            });
1929
1930        // Mock to fail during snapshot retrieval (this will cause an error during processing)
1931        rpc_client
1932            .expect_get_protocol_states()
1933            .returning(|_| {
1934                Err(RPCError::ParseResponse("Test error during snapshot retrieval".to_string()))
1935            });
1936
1937        // Set up deltas client to send one message that will trigger snapshot retrieval
1938        let (tx, rx) = channel(10);
1939        deltas_client
1940            .expect_subscribe()
1941            .return_once(move |_, _| {
1942                // Send a delta message that will require a snapshot
1943                let delta = BlockChanges {
1944                    extractor: "test".to_string(),
1945                    chain: Chain::Ethereum,
1946                    block: Block {
1947                        hash: Bytes::from("0x0123"),
1948                        number: 1,
1949                        parent_hash: Bytes::from("0x0000"),
1950                        chain: Chain::Ethereum,
1951                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
1952                            .unwrap()
1953                            .naive_utc(),
1954                    },
1955                    revert: false,
1956                    // Add a new component to trigger snapshot request
1957                    new_protocol_components: [(
1958                        "new_component".to_string(),
1959                        ProtocolComponent {
1960                            id: "new_component".to_string(),
1961                            protocol_system: "test_protocol".to_string(),
1962                            protocol_type_name: "test".to_string(),
1963                            chain: Chain::Ethereum,
1964                            tokens: vec![Bytes::from("0x0badc0ffee")],
1965                            contract_ids: vec![Bytes::from("0x0badc0ffee")],
1966                            static_attributes: Default::default(),
1967                            creation_tx: Default::default(),
1968                            created_at: Default::default(),
1969                            change: Default::default(),
1970                        },
1971                    )]
1972                    .into_iter()
1973                    .collect(),
1974                    component_tvl: [("new_component".to_string(), 100.0)]
1975                        .into_iter()
1976                        .collect(),
1977                    ..Default::default()
1978                };
1979
1980                tokio::spawn(async move {
1981                    let _ = tx.send(delta).await;
1982                    // Close the channel after sending one message
1983                });
1984
1985                Ok((Uuid::default(), rx))
1986            });
1987
1988        // Expect unsubscribe call during cleanup
1989        deltas_client
1990            .expect_unsubscribe()
1991            .return_once(|_| Ok(()));
1992
1993        let mut state_sync = ProtocolStateSynchronizer::new(
1994            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
1995            true,
1996            ComponentFilter::with_tvl_range(0.0, 1000.0), // Include the component
1997            1,
1998            Duration::from_secs(0),
1999            true,
2000            false,
2001            ArcRPCClient(Arc::new(rpc_client)),
2002            ArcDeltasClient(Arc::new(deltas_client)),
2003            5000_u64,
2004        );
2005
2006        state_sync
2007            .initialize()
2008            .await
2009            .expect("Init should succeed");
2010
2011        // Before calling state_sync, set a value in last_synced_block
2012        state_sync.last_synced_block = Some(BlockHeader {
2013            hash: Bytes::from("0x0badc0ffee"),
2014            number: 42,
2015            parent_hash: Bytes::from("0xbadbeef0"),
2016            revert: false,
2017            timestamp: 123456789,
2018        });
2019
2020        // Create a channel for state_sync to send messages to
2021        let (mut block_tx, _block_rx) = channel(10);
2022
2023        // Call state_sync directly - this should error during processing
2024        let (_end_tx, end_rx) = oneshot::channel::<()>();
2025        let result = state_sync
2026            .state_sync(&mut block_tx, end_rx)
2027            .await;
2028        // Verify that state_sync returned an error
2029        assert!(result.is_err(), "state_sync should have errored during processing");
2030
2031        // Note: We can't verify internal state cleanup since state_sync consumes self,
2032        // but the cleanup logic is still tested by the fact that the method returns properly.
2033    }
2034
2035    #[test(tokio::test)]
2036    async fn test_close_signal_while_waiting_for_first_deltas() {
2037        // Tests close signal handling during the initial "waiting for deltas" phase.
2038        // This is the earliest possible close scenario - before any deltas are received.
2039        // Verifies: close signal received while waiting for first message triggers cleanup
2040        let mut rpc_client = MockRPCClient::new();
2041        let mut deltas_client = MockDeltasClient::new();
2042
2043        rpc_client
2044            .expect_get_protocol_components()
2045            .returning(|_| {
2046                Ok(ProtocolComponentRequestResponse {
2047                    protocol_components: vec![],
2048                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2049                })
2050            });
2051
2052        let (_tx, rx) = channel(1);
2053        deltas_client
2054            .expect_subscribe()
2055            .return_once(move |_, _| Ok((Uuid::default(), rx)));
2056
2057        deltas_client
2058            .expect_unsubscribe()
2059            .return_once(|_| Ok(()));
2060
2061        let mut state_sync = ProtocolStateSynchronizer::new(
2062            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2063            true,
2064            ComponentFilter::with_tvl_range(0.0, 0.0),
2065            1,
2066            Duration::from_secs(0),
2067            true,
2068            false,
2069            ArcRPCClient(Arc::new(rpc_client)),
2070            ArcDeltasClient(Arc::new(deltas_client)),
2071            10000_u64,
2072        );
2073
2074        state_sync
2075            .initialize()
2076            .await
2077            .expect("Init should succeed");
2078
2079        let (mut block_tx, _block_rx) = channel(10);
2080        let (end_tx, end_rx) = oneshot::channel::<()>();
2081
2082        // Start state_sync in a task
2083        let state_sync_handle = tokio::spawn(async move {
2084            state_sync
2085                .state_sync(&mut block_tx, end_rx)
2086                .await
2087        });
2088
2089        // Give it a moment to start
2090        tokio::time::sleep(Duration::from_millis(100)).await;
2091
2092        // Send close signal
2093        let _ = end_tx.send(());
2094
2095        // state_sync should exit cleanly
2096        let result = state_sync_handle
2097            .await
2098            .expect("Task should not panic");
2099        assert!(result.is_ok(), "state_sync should exit cleanly when closed: {result:?}");
2100
2101        println!("SUCCESS: Close signal handled correctly while waiting for first deltas");
2102    }
2103
2104    #[test(tokio::test)]
2105    async fn test_close_signal_during_main_processing_loop() {
2106        // Tests close signal handling during the main delta processing loop.
2107        // This tests the scenario where first message is processed successfully,
2108        // then close signal is received while waiting for subsequent deltas.
2109        // Verifies: close signal in main loop (after initialization) triggers cleanup
2110
2111        let mut rpc_client = MockRPCClient::new();
2112        let mut deltas_client = MockDeltasClient::new();
2113
2114        // Mock the initial components call
2115        rpc_client
2116            .expect_get_protocol_components()
2117            .returning(|_| {
2118                Ok(ProtocolComponentRequestResponse {
2119                    protocol_components: vec![],
2120                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2121                })
2122            });
2123
2124        // Mock the snapshot retrieval that happens after first message
2125        rpc_client
2126            .expect_get_protocol_states()
2127            .returning(|_| {
2128                Ok(ProtocolStateRequestResponse {
2129                    states: vec![],
2130                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2131                })
2132            });
2133
2134        rpc_client
2135            .expect_get_component_tvl()
2136            .returning(|_| {
2137                Ok(ComponentTvlRequestResponse {
2138                    tvl: HashMap::new(),
2139                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2140                })
2141            });
2142
2143        rpc_client
2144            .expect_get_traced_entry_points()
2145            .returning(|_| {
2146                Ok(TracedEntryPointRequestResponse {
2147                    traced_entry_points: HashMap::new(),
2148                    pagination: PaginationResponse::new(0, 20, 0),
2149                })
2150            });
2151
2152        // Set up deltas client to send one message, then keep channel open
2153        let (tx, rx) = channel(10);
2154        deltas_client
2155            .expect_subscribe()
2156            .return_once(move |_, _| {
2157                // Send first message immediately
2158                let first_delta = BlockChanges {
2159                    extractor: "test".to_string(),
2160                    chain: Chain::Ethereum,
2161                    block: Block {
2162                        hash: Bytes::from("0x0123"),
2163                        number: 1,
2164                        parent_hash: Bytes::from("0x0000"),
2165                        chain: Chain::Ethereum,
2166                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2167                            .unwrap()
2168                            .naive_utc(),
2169                    },
2170                    revert: false,
2171                    ..Default::default()
2172                };
2173
2174                tokio::spawn(async move {
2175                    let _ = tx.send(first_delta).await;
2176                    // Keep the sender alive but don't send more messages
2177                    // This will make the recv() block waiting for the next message
2178                    tokio::time::sleep(Duration::from_secs(30)).await;
2179                });
2180
2181                Ok((Uuid::default(), rx))
2182            });
2183
2184        deltas_client
2185            .expect_unsubscribe()
2186            .return_once(|_| Ok(()));
2187
2188        let mut state_sync = ProtocolStateSynchronizer::new(
2189            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2190            true,
2191            ComponentFilter::with_tvl_range(0.0, 1000.0),
2192            1,
2193            Duration::from_secs(0),
2194            true,
2195            false,
2196            ArcRPCClient(Arc::new(rpc_client)),
2197            ArcDeltasClient(Arc::new(deltas_client)),
2198            10000_u64,
2199        );
2200
2201        state_sync
2202            .initialize()
2203            .await
2204            .expect("Init should succeed");
2205
2206        let (mut block_tx, mut block_rx) = channel(10);
2207        let (end_tx, end_rx) = oneshot::channel::<()>();
2208
2209        // Start state_sync in a task
2210        let state_sync_handle = tokio::spawn(async move {
2211            state_sync
2212                .state_sync(&mut block_tx, end_rx)
2213                .await
2214        });
2215
2216        // Wait for the first message to be processed (snapshot sent)
2217        let first_snapshot = block_rx
2218            .recv()
2219            .await
2220            .expect("Should receive first snapshot")
2221            .expect("Synchronizer error");
2222        assert!(
2223            !first_snapshot
2224                .snapshots
2225                .states
2226                .is_empty() ||
2227                first_snapshot.deltas.is_some()
2228        );
2229        // Now send close signal - this should be handled in the main processing loop
2230        let _ = end_tx.send(());
2231
2232        // state_sync should exit cleanly after receiving close signal in main loop
2233        let result = state_sync_handle
2234            .await
2235            .expect("Task should not panic");
2236        assert!(
2237            result.is_ok(),
2238            "state_sync should exit cleanly when closed after first message: {result:?}"
2239        );
2240    }
2241
2242    #[test(tokio::test)]
2243    async fn test_max_retries_exceeded_error_propagation() {
2244        // Test that when max_retries is exceeded, the final error is sent through the channel
2245        // to the receiver and the synchronizer task exits cleanly
2246
2247        let mut rpc_client = MockRPCClient::new();
2248        let mut deltas_client = MockDeltasClient::new();
2249
2250        // Mock the initial components call to succeed
2251        rpc_client
2252            .expect_get_protocol_components()
2253            .returning(|_| {
2254                Ok(ProtocolComponentRequestResponse {
2255                    protocol_components: vec![],
2256                    pagination: PaginationResponse { page: 0, page_size: 20, total: 0 },
2257                })
2258            });
2259
2260        // Set up deltas client to consistently fail after subscription
2261        // This will cause connection errors and trigger retries
2262        deltas_client
2263            .expect_subscribe()
2264            .returning(|_, _| {
2265                // Return a connection error to trigger retries
2266                Err(DeltasError::NotConnected)
2267            });
2268
2269        // Expect multiple unsubscribe calls during retries
2270        deltas_client
2271            .expect_unsubscribe()
2272            .returning(|_| Ok(()))
2273            .times(0..=5);
2274
2275        // Create synchronizer with only 2 retries and short cooldown
2276        let mut state_sync = ProtocolStateSynchronizer::new(
2277            ExtractorIdentity::new(Chain::Ethereum, "test-protocol"),
2278            true,
2279            ComponentFilter::with_tvl_range(0.0, 1000.0),
2280            2,                         // max_retries = 2
2281            Duration::from_millis(10), // short retry cooldown
2282            true,
2283            false,
2284            ArcRPCClient(Arc::new(rpc_client)),
2285            ArcDeltasClient(Arc::new(deltas_client)),
2286            1000_u64,
2287        );
2288
2289        state_sync
2290            .initialize()
2291            .await
2292            .expect("Init should succeed");
2293
2294        // Start the synchronizer - it should fail to subscribe and retry
2295        let (handle, mut rx) = state_sync.start().await;
2296        let (jh, _close_tx) = handle.split();
2297
2298        let res = tokio::time::timeout(Duration::from_millis(100), rx.recv())
2299            .await
2300            .expect("responsds in time")
2301            .expect("channel open");
2302
2303        // Verify the error is a ConnectionClosed error (converted from DeltasError::NotConnected)
2304        if let Err(err) = res {
2305            assert!(
2306                matches!(err, SynchronizerError::ConnectionClosed),
2307                "Expected ConnectionClosed error, got: {:?}",
2308                err
2309            );
2310        } else {
2311            panic!("Expected an error")
2312        }
2313
2314        // The task should complete (not hang) after max retries
2315        let task_result = tokio::time::timeout(Duration::from_secs(2), jh).await;
2316        assert!(task_result.is_ok(), "Synchronizer task should complete after max retries");
2317    }
2318
2319    #[test(tokio::test)]
2320    async fn test_is_next_expected() {
2321        // Test the is_next_expected function to ensure it correctly identifies
2322        // when an incoming block is the expected next block in the chain
2323
2324        let mut state_sync = with_mocked_clients(true, false, None, None);
2325
2326        // Test 1: No previous block - should return false
2327        let incoming_header = BlockHeader {
2328            number: 100,
2329            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2330            parent_hash: Bytes::from(
2331                "0x0000000000000000000000000000000000000000000000000000000000000000",
2332            ),
2333            revert: false,
2334            timestamp: 123456789,
2335        };
2336        assert!(
2337            !state_sync.is_next_expected(&incoming_header),
2338            "Should return false when no previous block is set"
2339        );
2340
2341        // Test 2: Set a previous block and test with matching parent hash
2342        let previous_header = BlockHeader {
2343            number: 99,
2344            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000000"),
2345            parent_hash: Bytes::from(
2346                "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789",
2347            ),
2348            revert: false,
2349            timestamp: 123456788,
2350        };
2351        state_sync.last_synced_block = Some(previous_header.clone());
2352
2353        assert!(
2354            state_sync.is_next_expected(&incoming_header),
2355            "Should return true when incoming parent_hash matches previous hash"
2356        );
2357
2358        // Test 3: Test with non-matching parent hash
2359        let non_matching_header = BlockHeader {
2360            number: 100,
2361            hash: Bytes::from("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"),
2362            parent_hash: Bytes::from(
2363                "0x1111111111111111111111111111111111111111111111111111111111111111",
2364            ), // Wrong parent hash
2365            revert: false,
2366            timestamp: 123456789,
2367        };
2368        assert!(
2369            !state_sync.is_next_expected(&non_matching_header),
2370            "Should return false when incoming parent_hash doesn't match previous hash"
2371        );
2372    }
2373
2374    #[test(tokio::test)]
2375    async fn test_synchronizer_restart_skip_snapshot_on_expected_block() {
2376        // Test that on synchronizer restart with the next expected block,
2377        // get_snapshot is not called and only deltas are sent
2378
2379        let mut rpc_client = MockRPCClient::new();
2380        let mut deltas_client = MockDeltasClient::new();
2381
2382        // Mock the initial components call
2383        rpc_client
2384            .expect_get_protocol_components()
2385            .returning(|_| {
2386                Ok(ProtocolComponentRequestResponse {
2387                    protocol_components: vec![ProtocolComponent {
2388                        id: "Component1".to_string(),
2389                        ..Default::default()
2390                    }],
2391                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2392                })
2393            });
2394
2395        // Set up deltas client to send a message that is the next expected block
2396        let (tx, rx) = channel(10);
2397        deltas_client
2398            .expect_subscribe()
2399            .return_once(move |_, _| {
2400                let expected_next_delta = BlockChanges {
2401                    extractor: "uniswap-v2".to_string(),
2402                    chain: Chain::Ethereum,
2403                    block: Block {
2404                        hash: Bytes::from(
2405                            "0x0000000000000000000000000000000000000000000000000000000000000002",
2406                        ), // This will be the next expected block
2407                        number: 2,
2408                        parent_hash: Bytes::from(
2409                            "0x0000000000000000000000000000000000000000000000000000000000000001",
2410                        ), // This matches our last synced block hash
2411                        chain: Chain::Ethereum,
2412                        ts: chrono::DateTime::from_timestamp(1234567890, 0)
2413                            .unwrap()
2414                            .naive_utc(),
2415                    },
2416                    revert: false,
2417                    ..Default::default()
2418                };
2419
2420                tokio::spawn(async move {
2421                    let _ = tx.send(expected_next_delta).await;
2422                });
2423
2424                Ok((Uuid::default(), rx))
2425            });
2426
2427        deltas_client
2428            .expect_unsubscribe()
2429            .return_once(|_| Ok(()));
2430
2431        let mut state_sync = ProtocolStateSynchronizer::new(
2432            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2433            true,
2434            ComponentFilter::with_tvl_range(0.0, 1000.0),
2435            1,
2436            Duration::from_secs(0),
2437            true, // include_snapshots = true
2438            false,
2439            ArcRPCClient(Arc::new(rpc_client)),
2440            ArcDeltasClient(Arc::new(deltas_client)),
2441            10000_u64,
2442        );
2443
2444        // Initialize and set up the last synced block to simulate a restart scenario
2445        state_sync
2446            .initialize()
2447            .await
2448            .expect("Init should succeed");
2449
2450        // Set last_synced_block to simulate that we've previously synced block 1
2451        state_sync.last_synced_block = Some(BlockHeader {
2452            number: 1,
2453            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001"), /* This matches the parent_hash in our delta */
2454            parent_hash: Bytes::from(
2455                "0x0000000000000000000000000000000000000000000000000000000000000000",
2456            ),
2457            revert: false,
2458            timestamp: 123456789,
2459        });
2460
2461        let (mut block_tx, mut block_rx) = channel(10);
2462        let (end_tx, end_rx) = oneshot::channel::<()>();
2463
2464        // Start state_sync
2465        let state_sync_handle = tokio::spawn(async move {
2466            state_sync
2467                .state_sync(&mut block_tx, end_rx)
2468                .await
2469        });
2470
2471        // Wait for the message - it should be a delta-only message (no snapshots)
2472        let result_msg = timeout(Duration::from_millis(200), block_rx.recv())
2473            .await
2474            .expect("Should receive message within timeout")
2475            .expect("Channel should be open")
2476            .expect("Should not be an error");
2477
2478        // Send close signal
2479        let _ = end_tx.send(());
2480
2481        // Wait for state_sync to finish
2482        let _ = state_sync_handle
2483            .await
2484            .expect("Task should not panic");
2485
2486        // Verify the message contains deltas but no snapshots
2487        // (because we skipped snapshot retrieval)
2488        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2489        assert!(
2490            result_msg.snapshots.states.is_empty(),
2491            "Should not contain snapshots when next expected block is received"
2492        );
2493
2494        // Verify the block details match our expected next block
2495        if let Some(deltas) = &result_msg.deltas {
2496            assert_eq!(deltas.block.number, 2);
2497            assert_eq!(
2498                deltas.block.hash,
2499                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002")
2500            );
2501            assert_eq!(
2502                deltas.block.parent_hash,
2503                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000001")
2504            );
2505        }
2506    }
2507
2508    #[test(tokio::test)]
2509    async fn test_skip_previously_processed_messages() {
2510        // Test that the synchronizer skips messages for blocks that have already been processed
2511        // This simulates a service restart scenario where old messages are re-emitted
2512
2513        let mut rpc_client = MockRPCClient::new();
2514        let mut deltas_client = MockDeltasClient::new();
2515
2516        // Mock the initial components call
2517        rpc_client
2518            .expect_get_protocol_components()
2519            .returning(|_| {
2520                Ok(ProtocolComponentRequestResponse {
2521                    protocol_components: vec![ProtocolComponent {
2522                        id: "Component1".to_string(),
2523                        ..Default::default()
2524                    }],
2525                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2526                })
2527            });
2528
2529        // Mock snapshot calls for when we process the expected next block (block 6)
2530        rpc_client
2531            .expect_get_protocol_states()
2532            .returning(|_| {
2533                Ok(ProtocolStateRequestResponse {
2534                    states: vec![ResponseProtocolState {
2535                        component_id: "Component1".to_string(),
2536                        ..Default::default()
2537                    }],
2538                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2539                })
2540            });
2541
2542        rpc_client
2543            .expect_get_component_tvl()
2544            .returning(|_| {
2545                Ok(ComponentTvlRequestResponse {
2546                    tvl: [("Component1".to_string(), 100.0)]
2547                        .into_iter()
2548                        .collect(),
2549                    pagination: PaginationResponse { page: 0, page_size: 20, total: 1 },
2550                })
2551            });
2552
2553        rpc_client
2554            .expect_get_traced_entry_points()
2555            .returning(|_| {
2556                Ok(TracedEntryPointRequestResponse {
2557                    traced_entry_points: HashMap::new(),
2558                    pagination: PaginationResponse::new(0, 20, 0),
2559                })
2560            });
2561
2562        // Set up deltas client to send old messages first, then the expected next block
2563        let (tx, rx) = channel(10);
2564        deltas_client
2565            .expect_subscribe()
2566            .return_once(move |_, _| {
2567                // Send messages for blocks 3, 4, 5 (already processed), then block 6 (expected)
2568                let old_messages = vec![
2569                    BlockChanges {
2570                        extractor: "uniswap-v2".to_string(),
2571                        chain: Chain::Ethereum,
2572                        block: Block {
2573                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2574                            number: 3,
2575                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000002"),
2576                            chain: Chain::Ethereum,
2577                            ts: chrono::DateTime::from_timestamp(1234567890, 0).unwrap().naive_utc(),
2578                        },
2579                        revert: false,
2580                        ..Default::default()
2581                    },
2582                    BlockChanges {
2583                        extractor: "uniswap-v2".to_string(),
2584                        chain: Chain::Ethereum,
2585                        block: Block {
2586                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2587                            number: 4,
2588                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000003"),
2589                            chain: Chain::Ethereum,
2590                            ts: chrono::DateTime::from_timestamp(1234567891, 0).unwrap().naive_utc(),
2591                        },
2592                        revert: false,
2593                        ..Default::default()
2594                    },
2595                    BlockChanges {
2596                        extractor: "uniswap-v2".to_string(),
2597                        chain: Chain::Ethereum,
2598                        block: Block {
2599                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2600                            number: 5,
2601                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000004"),
2602                            chain: Chain::Ethereum,
2603                            ts: chrono::DateTime::from_timestamp(1234567892, 0).unwrap().naive_utc(),
2604                        },
2605                        revert: false,
2606                        ..Default::default()
2607                    },
2608                    // This is the expected next block (block 6)
2609                    BlockChanges {
2610                        extractor: "uniswap-v2".to_string(),
2611                        chain: Chain::Ethereum,
2612                        block: Block {
2613                            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006"),
2614                            number: 6,
2615                            parent_hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2616                            chain: Chain::Ethereum,
2617                            ts: chrono::DateTime::from_timestamp(1234567893, 0).unwrap().naive_utc(),
2618                        },
2619                        revert: false,
2620                        ..Default::default()
2621                    },
2622                ];
2623
2624                tokio::spawn(async move {
2625                    for message in old_messages {
2626                        let _ = tx.send(message).await;
2627                        tokio::time::sleep(Duration::from_millis(10)).await;
2628                    }
2629                });
2630
2631                Ok((Uuid::default(), rx))
2632            });
2633
2634        deltas_client
2635            .expect_unsubscribe()
2636            .return_once(|_| Ok(()));
2637
2638        let mut state_sync = ProtocolStateSynchronizer::new(
2639            ExtractorIdentity::new(Chain::Ethereum, "uniswap-v2"),
2640            true,
2641            ComponentFilter::with_tvl_range(0.0, 1000.0),
2642            1,
2643            Duration::from_secs(0),
2644            true,
2645            true,
2646            ArcRPCClient(Arc::new(rpc_client)),
2647            ArcDeltasClient(Arc::new(deltas_client)),
2648            10000_u64,
2649        );
2650
2651        // Initialize and set last_synced_block to simulate we've already processed block 5
2652        state_sync
2653            .initialize()
2654            .await
2655            .expect("Init should succeed");
2656
2657        state_sync.last_synced_block = Some(BlockHeader {
2658            number: 5,
2659            hash: Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000005"),
2660            parent_hash: Bytes::from(
2661                "0x0000000000000000000000000000000000000000000000000000000000000004",
2662            ),
2663            revert: false,
2664            timestamp: 1234567892,
2665        });
2666
2667        let (mut block_tx, mut block_rx) = channel(10);
2668        let (end_tx, end_rx) = oneshot::channel::<()>();
2669
2670        // Start state_sync
2671        let state_sync_handle = tokio::spawn(async move {
2672            state_sync
2673                .state_sync(&mut block_tx, end_rx)
2674                .await
2675        });
2676
2677        // Wait for the message - it should only be for block 6 (skipping blocks 3, 4, 5)
2678        let result_msg = timeout(Duration::from_millis(500), block_rx.recv())
2679            .await
2680            .expect("Should receive message within timeout")
2681            .expect("Channel should be open")
2682            .expect("Should not be an error");
2683
2684        // Send close signal
2685        let _ = end_tx.send(());
2686
2687        // Wait for state_sync to finish
2688        let _ = state_sync_handle
2689            .await
2690            .expect("Task should not panic");
2691
2692        // Verify we only got the message for block 6 (the expected next block)
2693        assert!(result_msg.deltas.is_some(), "Should contain deltas");
2694        if let Some(deltas) = &result_msg.deltas {
2695            assert_eq!(
2696                deltas.block.number, 6,
2697                "Should only process block 6, skipping earlier blocks"
2698            );
2699            assert_eq!(
2700                deltas.block.hash,
2701                Bytes::from("0x0000000000000000000000000000000000000000000000000000000000000006")
2702            );
2703        }
2704
2705        // Verify that no additional messages are received immediately
2706        // (since the old blocks 3, 4, 5 were skipped and only block 6 was processed)
2707        match timeout(Duration::from_millis(50), block_rx.recv()).await {
2708            Err(_) => {
2709                // Timeout is expected - no more messages should come
2710            }
2711            Ok(Some(Err(_))) => {
2712                // Error received is also acceptable (connection closed)
2713            }
2714            Ok(Some(Ok(_))) => {
2715                panic!("Should not receive additional messages - old blocks should be skipped");
2716            }
2717            Ok(None) => {
2718                // Channel closed is also acceptable
2719            }
2720        }
2721    }
2722}