tycho_simulation/evm/stream.rs
1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//! set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//! use std::collections::HashSet;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//! let all_tokens = load_all_tokens(
74//! "tycho-beta.propellerheads.xyz",
75//! false,
76//! Some("sampletoken"),
77//! true,
78//! Chain::Ethereum,
79//! None,
80//! None,
81//! )
82//! .await
83//! .expect("Failed loading tokens");
84//!
85//! let protocol_stream =
86//! ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
87//! .auth_key(Some("sampletoken".to_string()))
88//! .skip_state_decode_failures(true)
89//! .exchange::<UniswapV2State>(
90//! "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
91//! )
92//! .blocklist_components(HashSet::new())
93//! .set_tokens(all_tokens)
94//! .await
95//! .build()
96//! .await
97//! .expect("Failed building protocol stream");
98//! tokio::pin!(protocol_stream);
99//!
100//! // Loop through block updates
101//! while let Some(msg) = protocol_stream.next().await {
102//! dbg!(msg).expect("failed decoding");
103//! }
104//! }
105//! ```
106use std::{
107 collections::{HashMap, HashSet},
108 sync::Arc,
109 time,
110};
111
112use futures::{future::Either, stream, Stream, StreamExt};
113use tokio_stream::wrappers::ReceiverStream;
114use tracing::{debug, error, warn};
115use tycho_client::{
116 feed::{
117 component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
118 FeedMessage, SynchronizerState,
119 },
120 stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
121};
122use tycho_common::{
123 models::{token::Token, Chain},
124 simulation::protocol_sim::ProtocolSim,
125 traits::TxDeltaIndexer,
126 Bytes,
127};
128
129use crate::{
130 evm::{
131 decoder::{StreamDecodeError, TychoStreamDecoder},
132 pending::PendingBlockProcessor,
133 protocol::{
134 native_wrapper::state::{NativeWrapperState, NATIVE_WRAPPER_ID},
135 uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
136 },
137 },
138 protocol::{
139 errors::InvalidSnapshotError,
140 models::{DecoderContext, TryFromWithBlock, Update},
141 },
142};
143
144const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
145
146#[derive(Default, Debug, Clone, Copy)]
147pub enum StreamEndPolicy {
148 /// End stream if all states are Stale or Ended (default)
149 #[default]
150 AllEndedOrStale,
151 /// End stream if any protocol ended
152 AnyEnded,
153 /// End stream if any protocol ended or is stale
154 AnyEndedOrStale,
155 /// End stream if any protocol is stale
156 AnyStale,
157}
158
159impl StreamEndPolicy {
160 fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
161 let mut it = states.into_iter();
162 match self {
163 StreamEndPolicy::AllEndedOrStale => false,
164 StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
165 StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
166 StreamEndPolicy::AnyEndedOrStale => {
167 it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
168 }
169 }
170 }
171}
172
173/// Builds and configures the multi protocol stream described in the [module-level docs](self).
174///
175/// See the module documentation for details on protocols, configuration options, and
176/// stream behavior.
177pub struct ProtocolStreamBuilder {
178 decoder: TychoStreamDecoder<BlockHeader>,
179 stream_builder: TychoStreamBuilder,
180 stream_end_policy: StreamEndPolicy,
181 chain: Chain,
182 pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
183}
184
185impl ProtocolStreamBuilder {
186 /// Creates a new builder for a multi-protocol stream.
187 ///
188 /// See the [module-level docs](self) for full details on stream behavior and configuration.
189 pub fn new(tycho_url: &str, chain: Chain) -> Self {
190 Self {
191 decoder: TychoStreamDecoder::new(),
192 stream_builder: TychoStreamBuilder::new(tycho_url, chain),
193 stream_end_policy: StreamEndPolicy::default(),
194 chain,
195 pending_indexers: HashMap::new(),
196 }
197 }
198
199 /// Adds a specific exchange to the stream.
200 ///
201 /// This configures the builder to include a new protocol synchronizer for `name`,
202 /// filtering its components according to `filter` and optionally `filter_fn`.
203 ///
204 /// The type parameter `T` specifies the decoder type for this exchange. All
205 /// component states for this exchange will be decoded into instances of `T`.
206 ///
207 /// # Parameters
208 ///
209 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
210 /// - `filter`: Defines the set of components to include in the stream.
211 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
212 /// expressible in `filter`.
213 ///
214 /// # Notes
215 ///
216 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
217 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
218 /// filter function is required to ensure correct decoding and quoting logic.
219 pub fn exchange<T>(
220 mut self,
221 name: &str,
222 filter: ComponentFilter,
223 filter_fn: Option<fn(&ComponentWithState) -> bool>,
224 ) -> Self
225 where
226 T: ProtocolSim
227 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
228 + Send
229 + 'static,
230 {
231 self.stream_builder = self
232 .stream_builder
233 .exchange(name, filter);
234 self.decoder.register_decoder::<T>(name);
235 if let Some(predicate) = filter_fn {
236 self.decoder
237 .register_filter(name, predicate);
238 }
239
240 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
241 warn!(
242 "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
243 name
244 );
245 }
246
247 self
248 }
249
250 /// Adds a specific exchange to the stream with decoder context.
251 ///
252 /// This configures the builder to include a new protocol synchronizer for `name`,
253 /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
254 /// the DecoderContext (this is useful to test protocols that are not live yet)
255 ///
256 /// The type parameter `T` specifies the decoder type for this exchange. All
257 /// component states for this exchange will be decoded into instances of `T`.
258 ///
259 /// # Parameters
260 ///
261 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
262 /// - `filter`: Defines the set of components to include in the stream.
263 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
264 /// expressible in `filter`.
265 /// - `decoder_context`: The decoder context for this exchange
266 ///
267 /// # Notes
268 ///
269 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
270 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
271 /// filter function is required to ensure correct decoding and quoting logic.
272 pub fn exchange_with_decoder_context<T>(
273 mut self,
274 name: &str,
275 filter: ComponentFilter,
276 filter_fn: Option<fn(&ComponentWithState) -> bool>,
277 decoder_context: DecoderContext,
278 ) -> Self
279 where
280 T: ProtocolSim
281 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
282 + Send
283 + 'static,
284 {
285 self.stream_builder = self
286 .stream_builder
287 .exchange(name, filter);
288 self.decoder
289 .register_decoder_with_context::<T>(name, decoder_context);
290 if let Some(predicate) = filter_fn {
291 self.decoder
292 .register_filter(name, predicate);
293 }
294
295 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
296 warn!(
297 "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
298 name
299 );
300 }
301
302 self
303 }
304
305 /// Sets the block time interval for the stream.
306 ///
307 /// This controls how often the stream produces updates.
308 pub fn block_time(mut self, block_time: u64) -> Self {
309 self.stream_builder = self
310 .stream_builder
311 .block_time(block_time);
312 self
313 }
314
315 /// Sets the network operation timeout (deprecated).
316 ///
317 /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
318 /// This method is retained for backwards compatibility.
319 #[deprecated = "Use latency_buffer instead"]
320 pub fn timeout(mut self, timeout: u64) -> Self {
321 self.stream_builder = self.stream_builder.timeout(timeout);
322 self
323 }
324
325 /// Sets the latency buffer to aggregate same-block messages.
326 ///
327 /// This allows the supervisor to wait a short interval for all synchronizers to emit
328 /// before aggregating.
329 pub fn latency_buffer(mut self, timeout: u64) -> Self {
330 self.stream_builder = self.stream_builder.timeout(timeout);
331 self
332 }
333
334 /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
335 pub fn max_missed_blocks(mut self, n: u64) -> Self {
336 self.stream_builder = self.stream_builder.max_missed_blocks(n);
337 self
338 }
339
340 /// Sets how long a synchronizer may take to process the initial message.
341 ///
342 /// Useful for data-intensive protocols where startup decoding takes longer.
343 pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
344 self.stream_builder = self
345 .stream_builder
346 .startup_timeout(timeout);
347 self
348 }
349
350 /// Configures the stream to exclude state updates.
351 ///
352 /// This reduces bandwidth and decoding workload if protocol state is not of
353 /// interest (e.g. only process new tokens).
354 pub fn no_state(mut self, no_state: bool) -> Self {
355 self.stream_builder = self.stream_builder.no_state(no_state);
356 self
357 }
358
359 /// Sets the API key for authenticating with the Tycho server.
360 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
361 self.stream_builder = self.stream_builder.auth_key(auth_key);
362 self
363 }
364
365 /// Disables TLS/ SSL for the connection, using http and ws protocols.
366 ///
367 /// This is not recommended for production use.
368 pub fn no_tls(mut self, no_tls: bool) -> Self {
369 self.stream_builder = self.stream_builder.no_tls(no_tls);
370 self
371 }
372
373 /// Disable compression for the connection.
374 pub fn disable_compression(mut self) -> Self {
375 self.stream_builder = self
376 .stream_builder
377 .disable_compression();
378 self
379 }
380
381 /// Enables partial block updates (flashblocks).
382 pub fn enable_partial_blocks(mut self) -> Self {
383 self.stream_builder = self
384 .stream_builder
385 .enable_partial_blocks();
386 self
387 }
388
389 /// Exclude specific component IDs from all registered exchanges.
390 pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
391 if !ids.is_empty() {
392 tracing::info!("Blocklisting {} components", ids.len());
393 self.stream_builder = self.stream_builder.blocklisted_ids(ids);
394 }
395 self
396 }
397
398 /// Sets the stream end policy.
399 ///
400 /// Controls when the stream should stop based on synchronizer states.
401 ///
402 /// ## Note
403 /// The stream always ends latest if all protocols are stale or ended independent of
404 /// this configuration. This allows you to end the stream earlier than that.
405 ///
406 /// See [self::StreamEndPolicy] for possible configuration options.
407 pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
408 self.stream_end_policy = stream_end_policy;
409 self
410 }
411
412 /// Provides token metadata used to decode startup snapshots and initialize protocol states.
413 ///
414 /// This is not a stream filter — components arriving after startup include their own token
415 /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
416 /// arriving via stream deltas are added automatically if they meet the quality threshold.
417 pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
418 self.decoder.set_tokens(tokens).await;
419 self
420 }
421
422 /// Skips decoding errors for component state updates.
423 ///
424 /// Allows the stream to continue processing even if some states fail to decode,
425 /// logging a warning instead of panicking.
426 pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
427 self.decoder
428 .skip_state_decode_failures(skip);
429 self
430 }
431
432 /// Sets the minimum token quality for tokens added via the stream.
433 ///
434 /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
435 /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
436 /// apply consistent filtering.
437 pub fn min_token_quality(mut self, quality: u32) -> Self {
438 self.decoder.min_token_quality(quality);
439 self
440 }
441
442 /// Configures the retry policy for websocket reconnects.
443 pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
444 self.stream_builder = self
445 .stream_builder
446 .websockets_retry_config(config);
447 self
448 }
449
450 /// Configures the retry policy for state synchronization.
451 pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
452 self.stream_builder = self
453 .stream_builder
454 .state_synchronizer_retry_config(config);
455 self
456 }
457
458 pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
459 &self.decoder
460 }
461
462 /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
463 ///
464 /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
465 /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
466 /// the confirmed stream and the pending processor.
467 ///
468 /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
469 /// `update_engine()` and cannot be simulated natively.
470 pub fn with_pending_indexer(
471 mut self,
472 extractor: &str,
473 indexer: Box<dyn TxDeltaIndexer>,
474 ) -> Result<Self, StreamError> {
475 if extractor.starts_with("vm:") {
476 return Err(StreamError::SetUpError(format!(
477 "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
478 )));
479 }
480 self.pending_indexers
481 .insert(extractor.to_string(), indexer);
482 Ok(self)
483 }
484
485 /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
486 /// in sync with it automatically.
487 ///
488 /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
489 /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
490 /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
491 /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
492 /// nothing for single-threaded use).
493 ///
494 /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
495 /// simulate a candidate bundle; it drains the channel automatically before computing.
496 pub async fn build_with_pending(
497 self,
498 ) -> Result<
499 (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
500 StreamError,
501 > {
502 initialize_hook_handlers().map_err(|e| {
503 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
504 })?;
505 let (_, rx) = self.stream_builder.build().await?;
506 let decoder = Arc::new(self.decoder);
507
508 let (advance_tx, advance_rx) =
509 tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
510 let pending = PendingBlockProcessor::new(
511 self.pending_indexers,
512 decoder.clone(),
513 self.chain,
514 advance_rx,
515 );
516
517 let stream_end_policy = self.stream_end_policy;
518 let stream = Box::pin(
519 ReceiverStream::new(rx)
520 .take_while(move |msg| match msg {
521 Ok(msg) => {
522 let states = msg.sync_states.values();
523 if stream_end_policy.should_end(states) {
524 error!(
525 "Block stream ended due to {:?}: {:?}",
526 stream_end_policy, msg.sync_states
527 );
528 futures::future::ready(false)
529 } else {
530 futures::future::ready(true)
531 }
532 }
533 Err(e) => {
534 error!("Block stream ended with terminal error: {e}");
535 futures::future::ready(false)
536 }
537 })
538 .then({
539 let decoder = decoder.clone();
540 move |msg| {
541 let decoder = decoder.clone();
542 let advance_tx = advance_tx.clone();
543 async move {
544 let msg = msg.expect("Safe since stream ends if we receive an error");
545 // Non-blocking: if the receiver is gone we just skip the send.
546 let _ = advance_tx.send(msg.clone());
547 decoder.decode(&msg).await.map_err(|e| {
548 debug!(msg=?msg, "Decode error: {}", e);
549 e
550 })
551 }
552 }
553 }),
554 );
555 let stream = inject_native_wrapper(stream, self.chain);
556 Ok((stream, pending))
557 }
558
559 /// Builds and returns the configured protocol stream.
560 ///
561 /// See the module-level docs for details on stream behavior and emitted messages.
562 /// This method applies all builder settings and starts the stream.
563 pub async fn build(
564 self,
565 ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
566 initialize_hook_handlers().map_err(|e| {
567 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
568 })?;
569 let (_, rx) = self.stream_builder.build().await?;
570 let decoder = Arc::new(self.decoder);
571 let chain = self.chain;
572
573 let stream = Box::pin(
574 ReceiverStream::new(rx)
575 .take_while(move |msg| match msg {
576 Ok(msg) => {
577 let states = msg.sync_states.values();
578 if self
579 .stream_end_policy
580 .should_end(states)
581 {
582 error!(
583 "Block stream ended due to {:?}: {:?}",
584 self.stream_end_policy, msg.sync_states
585 );
586 futures::future::ready(false)
587 } else {
588 futures::future::ready(true)
589 }
590 }
591 Err(e) => {
592 error!("Block stream ended with terminal error: {e}");
593 futures::future::ready(false)
594 }
595 })
596 .then({
597 let decoder = decoder.clone();
598 move |msg| {
599 let decoder = decoder.clone();
600 async move {
601 let msg = msg.expect("Save since stream ends if we receive an error");
602 decoder.decode(&msg).await.map_err(|e| {
603 debug!(msg=?msg, "Decode error: {}", e);
604 e
605 })
606 }
607 }
608 }),
609 );
610 let stream = inject_native_wrapper(stream, chain);
611 Ok(stream)
612 }
613}
614
615/// Wraps a decoded protocol stream to inject a `NativeWrapperState` component
616/// on the first successful update.
617///
618/// Skips injection for chains where the native and wrapped-native tokens share
619/// the same address (e.g. Starknet).
620fn inject_native_wrapper(
621 inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
622 chain: Chain,
623) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
624 let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
625
626 if !has_distinct_wrapper {
627 return Either::Left(inner);
628 }
629
630 Either::Right(
631 stream::once(async move {
632 let mut inner = inner;
633 let first = inner.next().await;
634 let modified = first.into_iter().map(move |result| {
635 result.map(|mut update| {
636 update.new_pairs.insert(
637 NATIVE_WRAPPER_ID.to_string(),
638 NativeWrapperState::component(chain),
639 );
640 update.states.insert(
641 NATIVE_WRAPPER_ID.to_string(),
642 Box::new(NativeWrapperState::new(chain)),
643 );
644 debug!("Injected native_wrapper component for {chain}");
645 update
646 })
647 });
648 stream::iter(modified).chain(inner)
649 })
650 .flatten(),
651 )
652}
653
654#[cfg(test)]
655mod tests {
656 use std::collections::HashMap;
657
658 use futures::{stream, StreamExt};
659 use tycho_common::models::Chain;
660
661 use super::*;
662 use crate::protocol::models::Update;
663
664 fn empty_update(block: u64) -> Update {
665 Update::new(block, HashMap::new(), HashMap::new())
666 }
667
668 #[tokio::test]
669 async fn test_inject_native_wrapper_first_message_only() {
670 let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
671 let input = stream::iter(updates);
672
673 let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
674 .collect()
675 .await;
676
677 assert_eq!(results.len(), 3);
678
679 let first = results[0]
680 .as_ref()
681 .expect("first update ok");
682 assert!(
683 first
684 .new_pairs
685 .contains_key(NATIVE_WRAPPER_ID),
686 "first message should have native_wrapper component"
687 );
688 assert!(
689 first
690 .states
691 .contains_key(NATIVE_WRAPPER_ID),
692 "first message should have native_wrapper state"
693 );
694
695 let second = results[1]
696 .as_ref()
697 .expect("second update ok");
698 assert!(
699 !second
700 .new_pairs
701 .contains_key(NATIVE_WRAPPER_ID),
702 "second message should NOT have native_wrapper component"
703 );
704 assert!(
705 !second
706 .states
707 .contains_key(NATIVE_WRAPPER_ID),
708 "second message should NOT have native_wrapper state"
709 );
710 }
711}