Skip to main content

tycho_simulation/evm/
stream.rs

1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! [`protocol::models::Update`] messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps [`tycho_client::stream::TychoStream`]. It decodes `FeedMessage`s
11//! into [`protocol::models::Update`]s. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//!   set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide an initial set of tokens of interest. The first message includes only
43//! components whose tokens match this set. The stream adds new tokens automatically
44//! when a component is deployed and its quality exceeds `min_token_quality`.
45//!
46//! ### StreamEndPolicy
47//! Control when the stream ends based on worker states. By default, it ends when all
48//! workers are `Stale` or `Ended`.
49//!
50//! ## Stream
51//! The stream emits one [`protocol::models::Update`] every `block_time`. Each update
52//! reports protocol synchronization states and any changes.
53//!
54//! The `new_components` field lists newly deployed components and their tokens.
55//!
56//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
57//! most errors, so users should rarely need to restart it manually.
58//!
59//! ## Example
60//! ```no_run
61//! use tycho_common::models::Chain;
62//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
63//! use tycho_simulation::utils::load_all_tokens;
64//! use futures::StreamExt;
65//! use tycho_client::feed::component_tracker::ComponentFilter;
66//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
67//! use std::collections::HashSet;
68//!
69//! #[tokio::main]
70//! async fn main() {
71//!     let all_tokens = load_all_tokens(
72//!         "tycho-beta.propellerheads.xyz",
73//!         false,
74//!         Some("sampletoken"),
75//!         true,
76//!         Chain::Ethereum,
77//!         None,
78//!         None,
79//!     )
80//!     .await
81//!     .expect("Failed loading tokens");
82//!
83//!     let mut protocol_stream =
84//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
85//!             .auth_key(Some("sampletoken".to_string()))
86//!             .skip_state_decode_failures(true)
87//!             .exchange::<UniswapV2State>(
88//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
89//!             )
90//!             .blocklist_components(HashSet::new())
91//!             .set_tokens(all_tokens)
92//!             .await
93//!             .build()
94//!             .await
95//!             .expect("Failed building protocol stream");
96//!
97//!     // Loop through block updates
98//!     while let Some(msg) = protocol_stream.next().await {
99//!         dbg!(msg).expect("failed decoding");
100//!     }
101//! }
102//! ```
103use std::{
104    collections::{HashMap, HashSet},
105    sync::Arc,
106    time,
107};
108
109use futures::{Stream, StreamExt};
110use tokio_stream::wrappers::ReceiverStream;
111use tracing::{debug, error, warn};
112use tycho_client::{
113    feed::{
114        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
115        SynchronizerState,
116    },
117    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
118};
119use tycho_common::{
120    models::{token::Token, Chain},
121    simulation::protocol_sim::ProtocolSim,
122    Bytes,
123};
124
125use crate::{
126    evm::{
127        decoder::{StreamDecodeError, TychoStreamDecoder},
128        protocol::uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
129    },
130    protocol::{
131        errors::InvalidSnapshotError,
132        models::{DecoderContext, TryFromWithBlock, Update},
133    },
134};
135
136const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
137
138#[derive(Default, Debug, Clone, Copy)]
139pub enum StreamEndPolicy {
140    /// End stream if all states are Stale or Ended (default)
141    #[default]
142    AllEndedOrStale,
143    /// End stream if any protocol ended
144    AnyEnded,
145    /// End stream if any protocol ended or is stale
146    AnyEndedOrStale,
147    /// End stream if any protocol is stale
148    AnyStale,
149}
150
151impl StreamEndPolicy {
152    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
153        let mut it = states.into_iter();
154        match self {
155            StreamEndPolicy::AllEndedOrStale => false,
156            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
157            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
158            StreamEndPolicy::AnyEndedOrStale => {
159                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
160            }
161        }
162    }
163}
164
165/// Builds and configures the multi protocol stream described in the [module-level docs](self).
166///
167/// See the module documentation for details on protocols, configuration options, and
168/// stream behavior.
169pub struct ProtocolStreamBuilder {
170    decoder: TychoStreamDecoder<BlockHeader>,
171    stream_builder: TychoStreamBuilder,
172    stream_end_policy: StreamEndPolicy,
173}
174
175impl ProtocolStreamBuilder {
176    /// Creates a new builder for a multi-protocol stream.
177    ///
178    /// See the [module-level docs](self) for full details on stream behavior and configuration.
179    pub fn new(tycho_url: &str, chain: Chain) -> Self {
180        Self {
181            decoder: TychoStreamDecoder::new(),
182            stream_builder: TychoStreamBuilder::new(tycho_url, chain.into()),
183            stream_end_policy: StreamEndPolicy::default(),
184        }
185    }
186
187    /// Adds a specific exchange to the stream.
188    ///
189    /// This configures the builder to include a new protocol synchronizer for `name`,
190    /// filtering its components according to `filter` and optionally `filter_fn`.
191    ///
192    /// The type parameter `T` specifies the decoder type for this exchange. All
193    /// component states for this exchange will be decoded into instances of `T`.
194    ///
195    /// # Parameters
196    ///
197    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
198    /// - `filter`: Defines the set of components to include in the stream.
199    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
200    ///   expressible in `filter`.
201    ///
202    /// # Notes
203    ///
204    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
205    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
206    /// filter function is required to ensure correct decoding and quoting logic.
207    pub fn exchange<T>(
208        mut self,
209        name: &str,
210        filter: ComponentFilter,
211        filter_fn: Option<fn(&ComponentWithState) -> bool>,
212    ) -> Self
213    where
214        T: ProtocolSim
215            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
216            + Send
217            + 'static,
218    {
219        self.stream_builder = self
220            .stream_builder
221            .exchange(name, filter);
222        self.decoder.register_decoder::<T>(name);
223        if let Some(predicate) = filter_fn {
224            self.decoder
225                .register_filter(name, predicate);
226        }
227
228        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
229            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
230        }
231
232        self
233    }
234
235    /// Adds a specific exchange to the stream with decoder context.
236    ///
237    /// This configures the builder to include a new protocol synchronizer for `name`,
238    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
239    /// the DecoderContext (this is useful to test protocols that are not live yet)
240    ///
241    /// The type parameter `T` specifies the decoder type for this exchange. All
242    /// component states for this exchange will be decoded into instances of `T`.
243    ///
244    /// # Parameters
245    ///
246    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
247    /// - `filter`: Defines the set of components to include in the stream.
248    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
249    ///   expressible in `filter`.
250    /// - `decoder_context`: The decoder context for this exchange
251    ///
252    /// # Notes
253    ///
254    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
255    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
256    /// filter function is required to ensure correct decoding and quoting logic.
257    pub fn exchange_with_decoder_context<T>(
258        mut self,
259        name: &str,
260        filter: ComponentFilter,
261        filter_fn: Option<fn(&ComponentWithState) -> bool>,
262        decoder_context: DecoderContext,
263    ) -> Self
264    where
265        T: ProtocolSim
266            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
267            + Send
268            + 'static,
269    {
270        self.stream_builder = self
271            .stream_builder
272            .exchange(name, filter);
273        self.decoder
274            .register_decoder_with_context::<T>(name, decoder_context);
275        if let Some(predicate) = filter_fn {
276            self.decoder
277                .register_filter(name, predicate);
278        }
279
280        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
281            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
282        }
283
284        self
285    }
286
287    /// Sets the block time interval for the stream.
288    ///
289    /// This controls how often the stream produces updates.
290    pub fn block_time(mut self, block_time: u64) -> Self {
291        self.stream_builder = self
292            .stream_builder
293            .block_time(block_time);
294        self
295    }
296
297    /// Sets the network operation timeout (deprecated).
298    ///
299    /// Use [`latency_buffer`] instead for controlling latency.
300    /// This method is retained for backwards compatibility.
301    #[deprecated = "Use latency_buffer instead"]
302    pub fn timeout(mut self, timeout: u64) -> Self {
303        self.stream_builder = self.stream_builder.timeout(timeout);
304        self
305    }
306
307    /// Sets the latency buffer to aggregate same-block messages.
308    ///
309    /// This allows the supervisor to wait a short interval for all synchronizers to emit
310    /// before aggregating.
311    pub fn latency_buffer(mut self, timeout: u64) -> Self {
312        self.stream_builder = self.stream_builder.timeout(timeout);
313        self
314    }
315
316    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
317    pub fn max_missed_blocks(mut self, n: u64) -> Self {
318        self.stream_builder = self.stream_builder.max_missed_blocks(n);
319        self
320    }
321
322    /// Sets how long a synchronizer may take to process the initial message.
323    ///
324    /// Useful for data-intensive protocols where startup decoding takes longer.
325    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
326        self.stream_builder = self
327            .stream_builder
328            .startup_timeout(timeout);
329        self
330    }
331
332    /// Configures the stream to exclude state updates.
333    ///
334    /// This reduces bandwidth and decoding workload if protocol state is not of
335    /// interest (e.g. only process new tokens).
336    pub fn no_state(mut self, no_state: bool) -> Self {
337        self.stream_builder = self.stream_builder.no_state(no_state);
338        self
339    }
340
341    /// Sets the API key for authenticating with the Tycho server.
342    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
343        self.stream_builder = self.stream_builder.auth_key(auth_key);
344        self
345    }
346
347    /// Disables TLS/ SSL for the connection, using http and ws protocols.
348    ///
349    /// This is not recommended for production use.
350    pub fn no_tls(mut self, no_tls: bool) -> Self {
351        self.stream_builder = self.stream_builder.no_tls(no_tls);
352        self
353    }
354
355    /// Disable compression for the connection.
356    pub fn disable_compression(mut self) -> Self {
357        self.stream_builder = self
358            .stream_builder
359            .disable_compression();
360        self
361    }
362
363    /// Enables partial block updates (flashblocks).
364    pub fn enable_partial_blocks(mut self) -> Self {
365        self.stream_builder = self
366            .stream_builder
367            .enable_partial_blocks();
368        self
369    }
370
371    /// Exclude specific component IDs from all registered exchanges.
372    pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
373        if !ids.is_empty() {
374            tracing::info!("Blocklisting {} components", ids.len());
375            self.stream_builder = self.stream_builder.blocklisted_ids(ids);
376        }
377        self
378    }
379
380    /// Sets the stream end policy.
381    ///
382    /// Controls when the stream should stop based on synchronizer states.
383    ///
384    /// ## Note
385    /// The stream always ends latest if all protocols are stale or ended independent of
386    /// this configuration. This allows you to end the stream earlier than that.
387    ///
388    /// See [self::StreamEndPolicy] for possible configuration options.
389    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
390        self.stream_end_policy = stream_end_policy;
391        self
392    }
393
394    /// Sets the initial tokens to consider during decoding.
395    ///
396    /// Only components containing these tokens will be decoded initially.
397    /// New tokens may be added automatically if they meet the quality threshold.
398    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
399        self.decoder.set_tokens(tokens).await;
400        self
401    }
402
403    /// Skips decoding errors for component state updates.
404    ///
405    /// Allows the stream to continue processing even if some states fail to decode,
406    /// logging a warning instead of panicking.
407    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
408        self.decoder
409            .skip_state_decode_failures(skip);
410        self
411    }
412
413    /// Sets the minimum token quality for tokens added via the stream.
414    ///
415    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
416    /// Set this to the same value used in [`load_all_tokens`] to apply consistent filtering.
417    pub fn min_token_quality(mut self, quality: u32) -> Self {
418        self.decoder.min_token_quality(quality);
419        self
420    }
421
422    /// Configures the retry policy for websocket reconnects.
423    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
424        self.stream_builder = self
425            .stream_builder
426            .websockets_retry_config(config);
427        self
428    }
429
430    /// Configures the retry policy for state synchronization.
431    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
432        self.stream_builder = self
433            .stream_builder
434            .state_synchronizer_retry_config(config);
435        self
436    }
437
438    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
439        &self.decoder
440    }
441
442    /// Builds and returns the configured protocol stream.
443    ///
444    /// See the module-level docs for details on stream behavior and emitted messages.
445    /// This method applies all builder settings and starts the stream.
446    pub async fn build(
447        self,
448    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
449        initialize_hook_handlers().map_err(|e| {
450            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
451        })?;
452        let (_, rx) = self.stream_builder.build().await?;
453        let decoder = Arc::new(self.decoder);
454
455        let stream = Box::pin(
456            ReceiverStream::new(rx)
457                .take_while(move |msg| match msg {
458                    Ok(msg) => {
459                        let states = msg.sync_states.values();
460                        if self
461                            .stream_end_policy
462                            .should_end(states)
463                        {
464                            error!(
465                                "Block stream ended due to {:?}: {:?}",
466                                self.stream_end_policy, msg.sync_states
467                            );
468                            futures::future::ready(false)
469                        } else {
470                            futures::future::ready(true)
471                        }
472                    }
473                    Err(e) => {
474                        error!("Block stream ended with terminal error: {e}");
475                        futures::future::ready(false)
476                    }
477                })
478                .then({
479                    let decoder = decoder.clone(); // Clone the decoder for the closure
480                    move |msg| {
481                        let decoder = decoder.clone(); // Clone again for the async block
482                        async move {
483                            let msg = msg.expect("Save since stream ends if we receive an error");
484                            decoder.decode(&msg).await.map_err(|e| {
485                                debug!(msg=?msg, "Decode error: {}", e);
486                                e
487                            })
488                        }
489                    }
490                }),
491        );
492        Ok(stream)
493    }
494}