Skip to main content

tycho_simulation/evm/
stream.rs

1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//!   set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//! use std::collections::HashSet;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//!     let all_tokens = load_all_tokens(
74//!         "tycho-beta.propellerheads.xyz",
75//!         false,
76//!         Some("sampletoken"),
77//!         true,
78//!         Chain::Ethereum,
79//!         None,
80//!         None,
81//!     )
82//!     .await
83//!     .expect("Failed loading tokens");
84//!
85//!     let protocol_stream =
86//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
87//!             .auth_key(Some("sampletoken".to_string()))
88//!             .skip_state_decode_failures(true)
89//!             .exchange::<UniswapV2State>(
90//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
91//!             )
92//!             .blocklist_components(HashSet::new())
93//!             .set_tokens(all_tokens)
94//!             .await
95//!             .build()
96//!             .await
97//!             .expect("Failed building protocol stream");
98//!     tokio::pin!(protocol_stream);
99//!
100//!     // Loop through block updates
101//!     while let Some(msg) = protocol_stream.next().await {
102//!         dbg!(msg).expect("failed decoding");
103//!     }
104//! }
105//! ```
106use std::{
107    collections::{HashMap, HashSet},
108    sync::Arc,
109    time,
110};
111
112use futures::{future::Either, stream, Stream, StreamExt};
113use tokio_stream::wrappers::ReceiverStream;
114use tracing::{debug, error, warn};
115use tycho_client::{
116    feed::{
117        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
118        FeedMessage, SynchronizerState,
119    },
120    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
121};
122use tycho_common::{
123    models::{token::Token, Chain},
124    simulation::protocol_sim::ProtocolSim,
125    traits::TxDeltaIndexer,
126    Bytes,
127};
128
129use crate::{
130    evm::{
131        decoder::{StreamDecodeError, TychoStreamDecoder},
132        pending::PendingBlockProcessor,
133        protocol::{
134            native_wrapper::state::{NativeWrapperState, NATIVE_WRAPPER_ID},
135            uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
136        },
137    },
138    protocol::{
139        errors::InvalidSnapshotError,
140        models::{DecoderContext, TryFromWithBlock, Update},
141    },
142};
143
144const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
145
146#[derive(Default, Debug, Clone, Copy)]
147pub enum StreamEndPolicy {
148    /// End stream if all states are Stale or Ended (default)
149    #[default]
150    AllEndedOrStale,
151    /// End stream if any protocol ended
152    AnyEnded,
153    /// End stream if any protocol ended or is stale
154    AnyEndedOrStale,
155    /// End stream if any protocol is stale
156    AnyStale,
157}
158
159impl StreamEndPolicy {
160    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
161        let mut it = states.into_iter();
162        match self {
163            StreamEndPolicy::AllEndedOrStale => false,
164            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
165            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
166            StreamEndPolicy::AnyEndedOrStale => {
167                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
168            }
169        }
170    }
171}
172
173/// Builds and configures the multi protocol stream described in the [module-level docs](self).
174///
175/// See the module documentation for details on protocols, configuration options, and
176/// stream behavior.
177pub struct ProtocolStreamBuilder {
178    decoder: TychoStreamDecoder<BlockHeader>,
179    stream_builder: TychoStreamBuilder,
180    stream_end_policy: StreamEndPolicy,
181    chain: Chain,
182    pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
183}
184
185impl ProtocolStreamBuilder {
186    /// Creates a new builder for a multi-protocol stream.
187    ///
188    /// See the [module-level docs](self) for full details on stream behavior and configuration.
189    pub fn new(tycho_url: &str, chain: Chain) -> Self {
190        Self {
191            decoder: TychoStreamDecoder::new(),
192            stream_builder: TychoStreamBuilder::new(tycho_url, chain),
193            stream_end_policy: StreamEndPolicy::default(),
194            chain,
195            pending_indexers: HashMap::new(),
196        }
197    }
198
199    /// Adds a specific exchange to the stream.
200    ///
201    /// This configures the builder to include a new protocol synchronizer for `name`,
202    /// filtering its components according to `filter` and optionally `filter_fn`.
203    ///
204    /// The type parameter `T` specifies the decoder type for this exchange. All
205    /// component states for this exchange will be decoded into instances of `T`.
206    ///
207    /// # Parameters
208    ///
209    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
210    /// - `filter`: Defines the set of components to include in the stream.
211    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
212    ///   expressible in `filter`.
213    ///
214    /// # Notes
215    ///
216    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
217    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
218    /// filter function is required to ensure correct decoding and quoting logic.
219    pub fn exchange<T>(
220        mut self,
221        name: &str,
222        filter: ComponentFilter,
223        filter_fn: Option<fn(&ComponentWithState) -> bool>,
224    ) -> Self
225    where
226        T: ProtocolSim
227            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
228            + Send
229            + 'static,
230    {
231        self.stream_builder = self
232            .stream_builder
233            .exchange(name, filter);
234        self.decoder.register_decoder::<T>(name);
235        if let Some(predicate) = filter_fn {
236            self.decoder
237                .register_filter(name, predicate);
238        }
239
240        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
241            warn!(
242                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
243                name
244            );
245        }
246
247        self
248    }
249
250    /// Adds a specific exchange to the stream with decoder context.
251    ///
252    /// This configures the builder to include a new protocol synchronizer for `name`,
253    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
254    /// the DecoderContext (this is useful to test protocols that are not live yet)
255    ///
256    /// The type parameter `T` specifies the decoder type for this exchange. All
257    /// component states for this exchange will be decoded into instances of `T`.
258    ///
259    /// # Parameters
260    ///
261    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
262    /// - `filter`: Defines the set of components to include in the stream.
263    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
264    ///   expressible in `filter`.
265    /// - `decoder_context`: The decoder context for this exchange
266    ///
267    /// # Notes
268    ///
269    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
270    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
271    /// filter function is required to ensure correct decoding and quoting logic.
272    pub fn exchange_with_decoder_context<T>(
273        mut self,
274        name: &str,
275        filter: ComponentFilter,
276        filter_fn: Option<fn(&ComponentWithState) -> bool>,
277        decoder_context: DecoderContext,
278    ) -> Self
279    where
280        T: ProtocolSim
281            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
282            + Send
283            + 'static,
284    {
285        self.stream_builder = self
286            .stream_builder
287            .exchange(name, filter);
288        self.decoder
289            .register_decoder_with_context::<T>(name, decoder_context);
290        if let Some(predicate) = filter_fn {
291            self.decoder
292                .register_filter(name, predicate);
293        }
294
295        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
296            warn!(
297                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
298                name
299            );
300        }
301
302        self
303    }
304
305    /// Sets the block time interval for the stream.
306    ///
307    /// This controls how often the stream produces updates.
308    pub fn block_time(mut self, block_time: u64) -> Self {
309        self.stream_builder = self
310            .stream_builder
311            .block_time(block_time);
312        self
313    }
314
315    /// Sets the network operation timeout (deprecated).
316    ///
317    /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
318    /// This method is retained for backwards compatibility.
319    #[deprecated = "Use latency_buffer instead"]
320    pub fn timeout(mut self, timeout: u64) -> Self {
321        self.stream_builder = self.stream_builder.timeout(timeout);
322        self
323    }
324
325    /// Sets the latency buffer to aggregate same-block messages.
326    ///
327    /// This allows the supervisor to wait a short interval for all synchronizers to emit
328    /// before aggregating.
329    pub fn latency_buffer(mut self, timeout: u64) -> Self {
330        self.stream_builder = self.stream_builder.timeout(timeout);
331        self
332    }
333
334    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
335    pub fn max_missed_blocks(mut self, n: u64) -> Self {
336        self.stream_builder = self.stream_builder.max_missed_blocks(n);
337        self
338    }
339
340    /// Sets how long a synchronizer may take to process the initial message.
341    ///
342    /// Useful for data-intensive protocols where startup decoding takes longer.
343    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
344        self.stream_builder = self
345            .stream_builder
346            .startup_timeout(timeout);
347        self
348    }
349
350    /// Configures the stream to exclude state updates.
351    ///
352    /// This reduces bandwidth and decoding workload if protocol state is not of
353    /// interest (e.g. only process new tokens).
354    pub fn no_state(mut self, no_state: bool) -> Self {
355        self.stream_builder = self.stream_builder.no_state(no_state);
356        self
357    }
358
359    /// Sets the API key for authenticating with the Tycho server.
360    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
361        self.stream_builder = self.stream_builder.auth_key(auth_key);
362        self
363    }
364
365    /// Disables TLS/ SSL for the connection, using http and ws protocols.
366    ///
367    /// This is not recommended for production use.
368    pub fn no_tls(mut self, no_tls: bool) -> Self {
369        self.stream_builder = self.stream_builder.no_tls(no_tls);
370        self
371    }
372
373    /// Disable compression for the connection.
374    pub fn disable_compression(mut self) -> Self {
375        self.stream_builder = self
376            .stream_builder
377            .disable_compression();
378        self
379    }
380
381    /// Enables partial block updates (flashblocks).
382    pub fn enable_partial_blocks(mut self) -> Self {
383        self.stream_builder = self
384            .stream_builder
385            .enable_partial_blocks();
386        self
387    }
388
389    /// Exclude specific component IDs from all registered exchanges.
390    pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
391        if !ids.is_empty() {
392            tracing::info!("Blocklisting {} components", ids.len());
393            self.stream_builder = self.stream_builder.blocklisted_ids(ids);
394        }
395        self
396    }
397
398    /// Sets the stream end policy.
399    ///
400    /// Controls when the stream should stop based on synchronizer states.
401    ///
402    /// ## Note
403    /// The stream always ends latest if all protocols are stale or ended independent of
404    /// this configuration. This allows you to end the stream earlier than that.
405    ///
406    /// See [self::StreamEndPolicy] for possible configuration options.
407    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
408        self.stream_end_policy = stream_end_policy;
409        self
410    }
411
412    /// Provides token metadata used to decode startup snapshots and initialize protocol states.
413    ///
414    /// This is not a stream filter — components arriving after startup include their own token
415    /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
416    /// arriving via stream deltas are added automatically if they meet the quality threshold.
417    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
418        self.decoder.set_tokens(tokens).await;
419        self
420    }
421
422    /// Skips decoding errors for component state updates.
423    ///
424    /// Allows the stream to continue processing even if some states fail to decode,
425    /// logging a warning instead of panicking.
426    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
427        self.decoder
428            .skip_state_decode_failures(skip);
429        self
430    }
431
432    /// Sets the minimum token quality for tokens added via the stream.
433    ///
434    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
435    /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
436    /// apply consistent filtering.
437    pub fn min_token_quality(mut self, quality: u32) -> Self {
438        self.decoder.min_token_quality(quality);
439        self
440    }
441
442    /// Configures the retry policy for websocket reconnects.
443    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
444        self.stream_builder = self
445            .stream_builder
446            .websockets_retry_config(config);
447        self
448    }
449
450    /// Configures the retry policy for state synchronization.
451    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
452        self.stream_builder = self
453            .stream_builder
454            .state_synchronizer_retry_config(config);
455        self
456    }
457
458    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
459        &self.decoder
460    }
461
462    /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
463    ///
464    /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
465    /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
466    /// the confirmed stream and the pending processor.
467    ///
468    /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
469    /// `update_engine()` and cannot be simulated natively.
470    pub fn with_pending_indexer(
471        mut self,
472        extractor: &str,
473        indexer: Box<dyn TxDeltaIndexer>,
474    ) -> Result<Self, StreamError> {
475        if extractor.starts_with("vm:") {
476            return Err(StreamError::SetUpError(format!(
477                "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
478            )));
479        }
480        self.pending_indexers
481            .insert(extractor.to_string(), indexer);
482        Ok(self)
483    }
484
485    /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
486    /// in sync with it automatically.
487    ///
488    /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
489    /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
490    /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
491    /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
492    /// nothing for single-threaded use).
493    ///
494    /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
495    /// simulate a candidate bundle; it drains the channel automatically before computing.
496    pub async fn build_with_pending(
497        self,
498    ) -> Result<
499        (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
500        StreamError,
501    > {
502        initialize_hook_handlers().map_err(|e| {
503            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
504        })?;
505        let (_, rx) = self.stream_builder.build().await?;
506        let decoder = Arc::new(self.decoder);
507
508        let (advance_tx, advance_rx) =
509            tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
510        let pending = PendingBlockProcessor::new(
511            self.pending_indexers,
512            decoder.clone(),
513            self.chain,
514            advance_rx,
515        );
516
517        let stream_end_policy = self.stream_end_policy;
518        let stream = Box::pin(
519            ReceiverStream::new(rx)
520                .take_while(move |msg| match msg {
521                    Ok(msg) => {
522                        let states = msg.sync_states.values();
523                        if stream_end_policy.should_end(states) {
524                            error!(
525                                "Block stream ended due to {:?}: {:?}",
526                                stream_end_policy, msg.sync_states
527                            );
528                            futures::future::ready(false)
529                        } else {
530                            futures::future::ready(true)
531                        }
532                    }
533                    Err(e) => {
534                        error!("Block stream ended with terminal error: {e}");
535                        futures::future::ready(false)
536                    }
537                })
538                .then({
539                    let decoder = decoder.clone();
540                    move |msg| {
541                        let decoder = decoder.clone();
542                        let advance_tx = advance_tx.clone();
543                        async move {
544                            let msg = msg.expect("Safe since stream ends if we receive an error");
545                            // Non-blocking: if the receiver is gone we just skip the send.
546                            let _ = advance_tx.send(msg.clone());
547                            decoder.decode(&msg).await.map_err(|e| {
548                                debug!(msg=?msg, "Decode error: {}", e);
549                                e
550                            })
551                        }
552                    }
553                }),
554        );
555        let stream = inject_native_wrapper(stream, self.chain);
556        Ok((stream, pending))
557    }
558
559    /// Builds and returns the configured protocol stream.
560    ///
561    /// See the module-level docs for details on stream behavior and emitted messages.
562    /// This method applies all builder settings and starts the stream.
563    pub async fn build(
564        self,
565    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
566        initialize_hook_handlers().map_err(|e| {
567            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
568        })?;
569        let (_, rx) = self.stream_builder.build().await?;
570        let decoder = Arc::new(self.decoder);
571        let chain = self.chain;
572
573        let stream = Box::pin(
574            ReceiverStream::new(rx)
575                .take_while(move |msg| match msg {
576                    Ok(msg) => {
577                        let states = msg.sync_states.values();
578                        if self
579                            .stream_end_policy
580                            .should_end(states)
581                        {
582                            error!(
583                                "Block stream ended due to {:?}: {:?}",
584                                self.stream_end_policy, msg.sync_states
585                            );
586                            futures::future::ready(false)
587                        } else {
588                            futures::future::ready(true)
589                        }
590                    }
591                    Err(e) => {
592                        error!("Block stream ended with terminal error: {e}");
593                        futures::future::ready(false)
594                    }
595                })
596                .then({
597                    let decoder = decoder.clone();
598                    move |msg| {
599                        let decoder = decoder.clone();
600                        async move {
601                            let msg = msg.expect("Save since stream ends if we receive an error");
602                            decoder.decode(&msg).await.map_err(|e| {
603                                debug!(msg=?msg, "Decode error: {}", e);
604                                e
605                            })
606                        }
607                    }
608                }),
609        );
610        let stream = inject_native_wrapper(stream, chain);
611        Ok(stream)
612    }
613}
614
615/// Wraps a decoded protocol stream to inject a `NativeWrapperState` component
616/// on the first successful update.
617///
618/// Skips injection for chains where the native and wrapped-native tokens share
619/// the same address (e.g. Starknet).
620fn inject_native_wrapper(
621    inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
622    chain: Chain,
623) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
624    let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
625
626    if !has_distinct_wrapper {
627        return Either::Left(inner);
628    }
629
630    Either::Right(
631        stream::once(async move {
632            let mut inner = inner;
633            let first = inner.next().await;
634            let modified = first.into_iter().map(move |result| {
635                result.map(|mut update| {
636                    update.new_pairs.insert(
637                        NATIVE_WRAPPER_ID.to_string(),
638                        NativeWrapperState::component(chain),
639                    );
640                    update.states.insert(
641                        NATIVE_WRAPPER_ID.to_string(),
642                        Box::new(NativeWrapperState::new(chain)),
643                    );
644                    debug!("Injected native_wrapper component for {chain}");
645                    update
646                })
647            });
648            stream::iter(modified).chain(inner)
649        })
650        .flatten(),
651    )
652}
653
654#[cfg(test)]
655mod tests {
656    use std::collections::HashMap;
657
658    use futures::{stream, StreamExt};
659    use tycho_common::models::Chain;
660
661    use super::*;
662    use crate::protocol::models::Update;
663
664    fn empty_update(block: u64) -> Update {
665        Update::new(block, HashMap::new(), HashMap::new())
666    }
667
668    #[tokio::test]
669    async fn test_inject_native_wrapper_first_message_only() {
670        let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
671        let input = stream::iter(updates);
672
673        let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
674            .collect()
675            .await;
676
677        assert_eq!(results.len(), 3);
678
679        let first = results[0]
680            .as_ref()
681            .expect("first update ok");
682        assert!(
683            first
684                .new_pairs
685                .contains_key(NATIVE_WRAPPER_ID),
686            "first message should have native_wrapper component"
687        );
688        assert!(
689            first
690                .states
691                .contains_key(NATIVE_WRAPPER_ID),
692            "first message should have native_wrapper state"
693        );
694
695        let second = results[1]
696            .as_ref()
697            .expect("second update ok");
698        assert!(
699            !second
700                .new_pairs
701                .contains_key(NATIVE_WRAPPER_ID),
702            "second message should NOT have native_wrapper component"
703        );
704        assert!(
705            !second
706                .states
707                .contains_key(NATIVE_WRAPPER_ID),
708            "second message should NOT have native_wrapper state"
709        );
710    }
711}