Skip to main content

tycho_simulation/evm/
stream.rs

1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//!   set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//! use std::collections::HashSet;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//!     let all_tokens = load_all_tokens(
74//!         "tycho-beta.propellerheads.xyz",
75//!         false,
76//!         Some("sampletoken"),
77//!         true,
78//!         Chain::Ethereum,
79//!         None,
80//!         None,
81//!     )
82//!     .await
83//!     .expect("Failed loading tokens");
84//!
85//!     let mut protocol_stream =
86//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
87//!             .auth_key(Some("sampletoken".to_string()))
88//!             .skip_state_decode_failures(true)
89//!             .exchange::<UniswapV2State>(
90//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
91//!             )
92//!             .blocklist_components(HashSet::new())
93//!             .set_tokens(all_tokens)
94//!             .await
95//!             .build()
96//!             .await
97//!             .expect("Failed building protocol stream");
98//!
99//!     // Loop through block updates
100//!     while let Some(msg) = protocol_stream.next().await {
101//!         dbg!(msg).expect("failed decoding");
102//!     }
103//! }
104//! ```
105use std::{
106    collections::{HashMap, HashSet},
107    sync::Arc,
108    time,
109};
110
111use futures::{Stream, StreamExt};
112use tokio_stream::wrappers::ReceiverStream;
113use tracing::{debug, error, warn};
114use tycho_client::{
115    feed::{
116        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
117        FeedMessage, SynchronizerState,
118    },
119    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
120};
121use tycho_common::{
122    models::{token::Token, Chain},
123    simulation::protocol_sim::ProtocolSim,
124    traits::TxDeltaIndexer,
125    Bytes,
126};
127
128use crate::{
129    evm::{
130        decoder::{StreamDecodeError, TychoStreamDecoder},
131        pending::PendingBlockProcessor,
132        protocol::uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
133    },
134    protocol::{
135        errors::InvalidSnapshotError,
136        models::{DecoderContext, TryFromWithBlock, Update},
137    },
138};
139
140const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
141
142#[derive(Default, Debug, Clone, Copy)]
143pub enum StreamEndPolicy {
144    /// End stream if all states are Stale or Ended (default)
145    #[default]
146    AllEndedOrStale,
147    /// End stream if any protocol ended
148    AnyEnded,
149    /// End stream if any protocol ended or is stale
150    AnyEndedOrStale,
151    /// End stream if any protocol is stale
152    AnyStale,
153}
154
155impl StreamEndPolicy {
156    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
157        let mut it = states.into_iter();
158        match self {
159            StreamEndPolicy::AllEndedOrStale => false,
160            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
161            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
162            StreamEndPolicy::AnyEndedOrStale => {
163                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
164            }
165        }
166    }
167}
168
169/// Builds and configures the multi protocol stream described in the [module-level docs](self).
170///
171/// See the module documentation for details on protocols, configuration options, and
172/// stream behavior.
173pub struct ProtocolStreamBuilder {
174    decoder: TychoStreamDecoder<BlockHeader>,
175    stream_builder: TychoStreamBuilder,
176    stream_end_policy: StreamEndPolicy,
177    chain: Chain,
178    pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
179}
180
181impl ProtocolStreamBuilder {
182    /// Creates a new builder for a multi-protocol stream.
183    ///
184    /// See the [module-level docs](self) for full details on stream behavior and configuration.
185    pub fn new(tycho_url: &str, chain: Chain) -> Self {
186        Self {
187            decoder: TychoStreamDecoder::new(),
188            stream_builder: TychoStreamBuilder::new(tycho_url, chain),
189            stream_end_policy: StreamEndPolicy::default(),
190            chain,
191            pending_indexers: HashMap::new(),
192        }
193    }
194
195    /// Adds a specific exchange to the stream.
196    ///
197    /// This configures the builder to include a new protocol synchronizer for `name`,
198    /// filtering its components according to `filter` and optionally `filter_fn`.
199    ///
200    /// The type parameter `T` specifies the decoder type for this exchange. All
201    /// component states for this exchange will be decoded into instances of `T`.
202    ///
203    /// # Parameters
204    ///
205    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
206    /// - `filter`: Defines the set of components to include in the stream.
207    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
208    ///   expressible in `filter`.
209    ///
210    /// # Notes
211    ///
212    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
213    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
214    /// filter function is required to ensure correct decoding and quoting logic.
215    pub fn exchange<T>(
216        mut self,
217        name: &str,
218        filter: ComponentFilter,
219        filter_fn: Option<fn(&ComponentWithState) -> bool>,
220    ) -> Self
221    where
222        T: ProtocolSim
223            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
224            + Send
225            + 'static,
226    {
227        self.stream_builder = self
228            .stream_builder
229            .exchange(name, filter);
230        self.decoder.register_decoder::<T>(name);
231        if let Some(predicate) = filter_fn {
232            self.decoder
233                .register_filter(name, predicate);
234        }
235
236        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
237            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
238        }
239
240        self
241    }
242
243    /// Adds a specific exchange to the stream with decoder context.
244    ///
245    /// This configures the builder to include a new protocol synchronizer for `name`,
246    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
247    /// the DecoderContext (this is useful to test protocols that are not live yet)
248    ///
249    /// The type parameter `T` specifies the decoder type for this exchange. All
250    /// component states for this exchange will be decoded into instances of `T`.
251    ///
252    /// # Parameters
253    ///
254    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
255    /// - `filter`: Defines the set of components to include in the stream.
256    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
257    ///   expressible in `filter`.
258    /// - `decoder_context`: The decoder context for this exchange
259    ///
260    /// # Notes
261    ///
262    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
263    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
264    /// filter function is required to ensure correct decoding and quoting logic.
265    pub fn exchange_with_decoder_context<T>(
266        mut self,
267        name: &str,
268        filter: ComponentFilter,
269        filter_fn: Option<fn(&ComponentWithState) -> bool>,
270        decoder_context: DecoderContext,
271    ) -> Self
272    where
273        T: ProtocolSim
274            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
275            + Send
276            + 'static,
277    {
278        self.stream_builder = self
279            .stream_builder
280            .exchange(name, filter);
281        self.decoder
282            .register_decoder_with_context::<T>(name, decoder_context);
283        if let Some(predicate) = filter_fn {
284            self.decoder
285                .register_filter(name, predicate);
286        }
287
288        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
289            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
290        }
291
292        self
293    }
294
295    /// Sets the block time interval for the stream.
296    ///
297    /// This controls how often the stream produces updates.
298    pub fn block_time(mut self, block_time: u64) -> Self {
299        self.stream_builder = self
300            .stream_builder
301            .block_time(block_time);
302        self
303    }
304
305    /// Sets the network operation timeout (deprecated).
306    ///
307    /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
308    /// This method is retained for backwards compatibility.
309    #[deprecated = "Use latency_buffer instead"]
310    pub fn timeout(mut self, timeout: u64) -> Self {
311        self.stream_builder = self.stream_builder.timeout(timeout);
312        self
313    }
314
315    /// Sets the latency buffer to aggregate same-block messages.
316    ///
317    /// This allows the supervisor to wait a short interval for all synchronizers to emit
318    /// before aggregating.
319    pub fn latency_buffer(mut self, timeout: u64) -> Self {
320        self.stream_builder = self.stream_builder.timeout(timeout);
321        self
322    }
323
324    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
325    pub fn max_missed_blocks(mut self, n: u64) -> Self {
326        self.stream_builder = self.stream_builder.max_missed_blocks(n);
327        self
328    }
329
330    /// Sets how long a synchronizer may take to process the initial message.
331    ///
332    /// Useful for data-intensive protocols where startup decoding takes longer.
333    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
334        self.stream_builder = self
335            .stream_builder
336            .startup_timeout(timeout);
337        self
338    }
339
340    /// Configures the stream to exclude state updates.
341    ///
342    /// This reduces bandwidth and decoding workload if protocol state is not of
343    /// interest (e.g. only process new tokens).
344    pub fn no_state(mut self, no_state: bool) -> Self {
345        self.stream_builder = self.stream_builder.no_state(no_state);
346        self
347    }
348
349    /// Sets the API key for authenticating with the Tycho server.
350    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
351        self.stream_builder = self.stream_builder.auth_key(auth_key);
352        self
353    }
354
355    /// Disables TLS/ SSL for the connection, using http and ws protocols.
356    ///
357    /// This is not recommended for production use.
358    pub fn no_tls(mut self, no_tls: bool) -> Self {
359        self.stream_builder = self.stream_builder.no_tls(no_tls);
360        self
361    }
362
363    /// Disable compression for the connection.
364    pub fn disable_compression(mut self) -> Self {
365        self.stream_builder = self
366            .stream_builder
367            .disable_compression();
368        self
369    }
370
371    /// Enables partial block updates (flashblocks).
372    pub fn enable_partial_blocks(mut self) -> Self {
373        self.stream_builder = self
374            .stream_builder
375            .enable_partial_blocks();
376        self
377    }
378
379    /// Exclude specific component IDs from all registered exchanges.
380    pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
381        if !ids.is_empty() {
382            tracing::info!("Blocklisting {} components", ids.len());
383            self.stream_builder = self.stream_builder.blocklisted_ids(ids);
384        }
385        self
386    }
387
388    /// Sets the stream end policy.
389    ///
390    /// Controls when the stream should stop based on synchronizer states.
391    ///
392    /// ## Note
393    /// The stream always ends latest if all protocols are stale or ended independent of
394    /// this configuration. This allows you to end the stream earlier than that.
395    ///
396    /// See [self::StreamEndPolicy] for possible configuration options.
397    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
398        self.stream_end_policy = stream_end_policy;
399        self
400    }
401
402    /// Provides token metadata used to decode startup snapshots and initialize protocol states.
403    ///
404    /// This is not a stream filter — components arriving after startup include their own token
405    /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
406    /// arriving via stream deltas are added automatically if they meet the quality threshold.
407    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
408        self.decoder.set_tokens(tokens).await;
409        self
410    }
411
412    /// Skips decoding errors for component state updates.
413    ///
414    /// Allows the stream to continue processing even if some states fail to decode,
415    /// logging a warning instead of panicking.
416    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
417        self.decoder
418            .skip_state_decode_failures(skip);
419        self
420    }
421
422    /// Sets the minimum token quality for tokens added via the stream.
423    ///
424    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
425    /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
426    /// apply consistent filtering.
427    pub fn min_token_quality(mut self, quality: u32) -> Self {
428        self.decoder.min_token_quality(quality);
429        self
430    }
431
432    /// Configures the retry policy for websocket reconnects.
433    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
434        self.stream_builder = self
435            .stream_builder
436            .websockets_retry_config(config);
437        self
438    }
439
440    /// Configures the retry policy for state synchronization.
441    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
442        self.stream_builder = self
443            .stream_builder
444            .state_synchronizer_retry_config(config);
445        self
446    }
447
448    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
449        &self.decoder
450    }
451
452    /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
453    ///
454    /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
455    /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
456    /// the confirmed stream and the pending processor.
457    ///
458    /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
459    /// `update_engine()` and cannot be simulated natively.
460    pub fn with_pending_indexer(
461        mut self,
462        extractor: &str,
463        indexer: Box<dyn TxDeltaIndexer>,
464    ) -> Result<Self, StreamError> {
465        if extractor.starts_with("vm:") {
466            return Err(StreamError::SetUpError(format!(
467                "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
468            )));
469        }
470        self.pending_indexers
471            .insert(extractor.to_string(), indexer);
472        Ok(self)
473    }
474
475    /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
476    /// in sync with it automatically.
477    ///
478    /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
479    /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
480    /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
481    /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
482    /// nothing for single-threaded use).
483    ///
484    /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
485    /// simulate a candidate bundle; it drains the channel automatically before computing.
486    pub async fn build_with_pending(
487        self,
488    ) -> Result<
489        (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
490        StreamError,
491    > {
492        initialize_hook_handlers().map_err(|e| {
493            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
494        })?;
495        let (_, rx) = self.stream_builder.build().await?;
496        let decoder = Arc::new(self.decoder);
497
498        let (advance_tx, advance_rx) =
499            tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
500        let pending = PendingBlockProcessor::new(
501            self.pending_indexers,
502            decoder.clone(),
503            self.chain,
504            advance_rx,
505        );
506
507        let stream_end_policy = self.stream_end_policy;
508        let stream = Box::pin(
509            ReceiverStream::new(rx)
510                .take_while(move |msg| match msg {
511                    Ok(msg) => {
512                        let states = msg.sync_states.values();
513                        if stream_end_policy.should_end(states) {
514                            error!(
515                                "Block stream ended due to {:?}: {:?}",
516                                stream_end_policy, msg.sync_states
517                            );
518                            futures::future::ready(false)
519                        } else {
520                            futures::future::ready(true)
521                        }
522                    }
523                    Err(e) => {
524                        error!("Block stream ended with terminal error: {e}");
525                        futures::future::ready(false)
526                    }
527                })
528                .then({
529                    let decoder = decoder.clone();
530                    move |msg| {
531                        let decoder = decoder.clone();
532                        let advance_tx = advance_tx.clone();
533                        async move {
534                            let msg = msg.expect("Safe since stream ends if we receive an error");
535                            // Non-blocking: if the receiver is gone we just skip the send.
536                            let _ = advance_tx.send(msg.clone());
537                            decoder.decode(&msg).await.map_err(|e| {
538                                debug!(msg=?msg, "Decode error: {}", e);
539                                e
540                            })
541                        }
542                    }
543                }),
544        );
545        Ok((stream, pending))
546    }
547
548    /// Builds and returns the configured protocol stream.
549    ///
550    /// See the module-level docs for details on stream behavior and emitted messages.
551    /// This method applies all builder settings and starts the stream.
552    pub async fn build(
553        self,
554    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
555        initialize_hook_handlers().map_err(|e| {
556            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
557        })?;
558        let (_, rx) = self.stream_builder.build().await?;
559        let decoder = Arc::new(self.decoder);
560
561        let stream = Box::pin(
562            ReceiverStream::new(rx)
563                .take_while(move |msg| match msg {
564                    Ok(msg) => {
565                        let states = msg.sync_states.values();
566                        if self
567                            .stream_end_policy
568                            .should_end(states)
569                        {
570                            error!(
571                                "Block stream ended due to {:?}: {:?}",
572                                self.stream_end_policy, msg.sync_states
573                            );
574                            futures::future::ready(false)
575                        } else {
576                            futures::future::ready(true)
577                        }
578                    }
579                    Err(e) => {
580                        error!("Block stream ended with terminal error: {e}");
581                        futures::future::ready(false)
582                    }
583                })
584                .then({
585                    let decoder = decoder.clone(); // Clone the decoder for the closure
586                    move |msg| {
587                        let decoder = decoder.clone(); // Clone again for the async block
588                        async move {
589                            let msg = msg.expect("Save since stream ends if we receive an error");
590                            decoder.decode(&msg).await.map_err(|e| {
591                                debug!(msg=?msg, "Decode error: {}", e);
592                                e
593                            })
594                        }
595                    }
596                }),
597        );
598        Ok(stream)
599    }
600}