tycho_simulation/evm/
stream.rs

1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! [`protocol::models::Update`] messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps [`tycho_client::stream::TychoStream`]. It decodes `FeedMessage`s
11//! into [`protocol::models::Update`]s. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//!   set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide an initial set of tokens of interest. The first message includes only
43//! components whose tokens match this set. The stream adds new tokens automatically
44//! when a component is deployed and its quality exceeds `min_token_quality`.
45//!
46//! ### StreamEndPolicy
47//! Control when the stream ends based on worker states. By default, it ends when all
48//! workers are `Stale` or `Ended`.
49//!
50//! ## Stream
51//! The stream emits one [`protocol::models::Update`] every `block_time`. Each update
52//! reports protocol synchronization states and any changes.
53//!
54//! The `new_components` field lists newly deployed components and their tokens.
55//!
56//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
57//! most errors, so users should rarely need to restart it manually.
58//!
59//! ## Example
60//! ```no_run
61//! use tycho_common::models::Chain;
62//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
63//! use tycho_simulation::utils::load_all_tokens;
64//! use futures::StreamExt;
65//! use tycho_client::feed::component_tracker::ComponentFilter;
66//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
67//!
68//! #[tokio::main]
69//! async fn main() {
70//!     let all_tokens = load_all_tokens(
71//!         "tycho-beta.propellerheads.xyz",
72//!         false,
73//!         Some("sampletoken"),
74//!         true,
75//!         Chain::Ethereum,
76//!         None,
77//!         None,
78//!     )
79//!     .await
80//!     .expect("Failed loading tokens");
81//!
82//!     let mut protocol_stream =
83//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
84//!             .auth_key(Some("sampletoken".to_string()))
85//!             .skip_state_decode_failures(true)
86//!             .exchange::<UniswapV2State>(
87//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
88//!             )
89//!             .set_tokens(all_tokens)
90//!             .await
91//!             .build()
92//!             .await
93//!             .expect("Failed building protocol stream");
94//!
95//!     // Loop through block updates
96//!     while let Some(msg) = protocol_stream.next().await {
97//!         dbg!(msg).expect("failed decoding");
98//!     }
99//! }
100//! ```
101use std::{collections::HashMap, sync::Arc, time};
102
103use futures::{Stream, StreamExt};
104use tokio_stream::wrappers::ReceiverStream;
105use tracing::{debug, error, warn};
106use tycho_client::{
107    feed::{
108        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
109        SynchronizerState,
110    },
111    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
112};
113use tycho_common::{
114    models::{token::Token, Chain},
115    simulation::protocol_sim::ProtocolSim,
116    Bytes,
117};
118
119use crate::{
120    evm::{
121        decoder::{StreamDecodeError, TychoStreamDecoder},
122        protocol::uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
123    },
124    protocol::{
125        errors::InvalidSnapshotError,
126        models::{DecoderContext, TryFromWithBlock, Update},
127    },
128};
129
130const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
131
132#[derive(Default, Debug, Clone, Copy)]
133pub enum StreamEndPolicy {
134    /// End stream if all states are Stale or Ended (default)
135    #[default]
136    AllEndedOrStale,
137    /// End stream if any protocol ended
138    AnyEnded,
139    /// End stream if any protocol ended or is stale
140    AnyEndedOrStale,
141    /// End stream if any protocol is stale
142    AnyStale,
143}
144
145impl StreamEndPolicy {
146    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
147        let mut it = states.into_iter();
148        match self {
149            StreamEndPolicy::AllEndedOrStale => false,
150            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
151            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
152            StreamEndPolicy::AnyEndedOrStale => {
153                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
154            }
155        }
156    }
157}
158
159/// Builds and configures the multi protocol stream described in the [module-level docs](self).
160///
161/// See the module documentation for details on protocols, configuration options, and
162/// stream behavior.
163pub struct ProtocolStreamBuilder {
164    decoder: TychoStreamDecoder<BlockHeader>,
165    stream_builder: TychoStreamBuilder,
166    stream_end_policy: StreamEndPolicy,
167}
168
169impl ProtocolStreamBuilder {
170    /// Creates a new builder for a multi-protocol stream.
171    ///
172    /// See the [module-level docs](self) for full details on stream behavior and configuration.
173    pub fn new(tycho_url: &str, chain: Chain) -> Self {
174        Self {
175            decoder: TychoStreamDecoder::new(),
176            stream_builder: TychoStreamBuilder::new(tycho_url, chain.into()),
177            stream_end_policy: StreamEndPolicy::default(),
178        }
179    }
180
181    /// Adds a specific exchange to the stream.
182    ///
183    /// This configures the builder to include a new protocol synchronizer for `name`,
184    /// filtering its components according to `filter` and optionally `filter_fn`.
185    ///
186    /// The type parameter `T` specifies the decoder type for this exchange. All
187    /// component states for this exchange will be decoded into instances of `T`.
188    ///
189    /// # Parameters
190    ///
191    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
192    /// - `filter`: Defines the set of components to include in the stream.
193    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
194    ///   expressible in `filter`.
195    ///
196    /// # Notes
197    ///
198    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
199    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
200    /// filter function is required to ensure correct decoding and quoting logic.
201    pub fn exchange<T>(
202        mut self,
203        name: &str,
204        filter: ComponentFilter,
205        filter_fn: Option<fn(&ComponentWithState) -> bool>,
206    ) -> Self
207    where
208        T: ProtocolSim
209            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
210            + Send
211            + 'static,
212    {
213        self.stream_builder = self
214            .stream_builder
215            .exchange(name, filter);
216        self.decoder.register_decoder::<T>(name);
217        if let Some(predicate) = filter_fn {
218            self.decoder
219                .register_filter(name, predicate);
220        }
221
222        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
223            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
224        }
225
226        self
227    }
228
229    /// Adds a specific exchange to the stream with decoder context.
230    ///
231    /// This configures the builder to include a new protocol synchronizer for `name`,
232    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
233    /// the DecoderContext (this is useful to test protocols that are not live yet)
234    ///
235    /// The type parameter `T` specifies the decoder type for this exchange. All
236    /// component states for this exchange will be decoded into instances of `T`.
237    ///
238    /// # Parameters
239    ///
240    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
241    /// - `filter`: Defines the set of components to include in the stream.
242    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
243    ///   expressible in `filter`.
244    /// - `decoder_context`: The decoder context for this exchange
245    ///
246    /// # Notes
247    ///
248    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
249    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
250    /// filter function is required to ensure correct decoding and quoting logic.
251    pub fn exchange_with_decoder_context<T>(
252        mut self,
253        name: &str,
254        filter: ComponentFilter,
255        filter_fn: Option<fn(&ComponentWithState) -> bool>,
256        decoder_context: DecoderContext,
257    ) -> Self
258    where
259        T: ProtocolSim
260            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
261            + Send
262            + 'static,
263    {
264        self.stream_builder = self
265            .stream_builder
266            .exchange(name, filter);
267        self.decoder
268            .register_decoder_with_context::<T>(name, decoder_context);
269        if let Some(predicate) = filter_fn {
270            self.decoder
271                .register_filter(name, predicate);
272        }
273
274        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
275            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
276        }
277
278        self
279    }
280
281    /// Sets the block time interval for the stream.
282    ///
283    /// This controls how often the stream produces updates.
284    pub fn block_time(mut self, block_time: u64) -> Self {
285        self.stream_builder = self
286            .stream_builder
287            .block_time(block_time);
288        self
289    }
290
291    /// Sets the network operation timeout (deprecated).
292    ///
293    /// Use [`latency_buffer`] instead for controlling latency.
294    /// This method is retained for backwards compatibility.
295    #[deprecated = "Use latency_buffer instead"]
296    pub fn timeout(mut self, timeout: u64) -> Self {
297        self.stream_builder = self.stream_builder.timeout(timeout);
298        self
299    }
300
301    /// Sets the latency buffer to aggregate same-block messages.
302    ///
303    /// This allows the supervisor to wait a short interval for all synchronizers to emit
304    /// before aggregating.
305    pub fn latency_buffer(mut self, timeout: u64) -> Self {
306        self.stream_builder = self.stream_builder.timeout(timeout);
307        self
308    }
309
310    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
311    pub fn max_missed_blocks(mut self, n: u64) -> Self {
312        self.stream_builder = self.stream_builder.max_missed_blocks(n);
313        self
314    }
315
316    /// Sets how long a synchronizer may take to process the initial message.
317    ///
318    /// Useful for data-intensive protocols where startup decoding takes longer.
319    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
320        self.stream_builder = self
321            .stream_builder
322            .startup_timeout(timeout);
323        self
324    }
325
326    /// Configures the stream to exclude state updates.
327    ///
328    /// This reduces bandwidth and decoding workload if protocol state is not of
329    /// interest (e.g. only process new tokens).
330    pub fn no_state(mut self, no_state: bool) -> Self {
331        self.stream_builder = self.stream_builder.no_state(no_state);
332        self
333    }
334
335    /// Sets the API key for authenticating with the Tycho server.
336    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
337        self.stream_builder = self.stream_builder.auth_key(auth_key);
338        self
339    }
340
341    /// Disables TLS/ SSL for the connection, using http and ws protocols.
342    ///
343    /// This is not recommended for production use.
344    pub fn no_tls(mut self, no_tls: bool) -> Self {
345        self.stream_builder = self.stream_builder.no_tls(no_tls);
346        self
347    }
348
349    /// Disable compression for the connection.
350    pub fn disable_compression(mut self) -> Self {
351        self.stream_builder = self
352            .stream_builder
353            .disable_compression();
354        self
355    }
356
357    /// Sets the stream end policy.
358    ///
359    /// Controls when the stream should stop based on synchronizer states.
360    ///
361    /// ## Note
362    /// The stream always ends latest if all protocols are stale or ended independent of
363    /// this configuration. This allows you to end the stream earlier than that.
364    ///
365    /// See [self::StreamEndPolicy] for possible configuration options.
366    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
367        self.stream_end_policy = stream_end_policy;
368        self
369    }
370
371    /// Sets the initial tokens to consider during decoding.
372    ///
373    /// Only components containing these tokens will be decoded initially.
374    /// New tokens may be added automatically if they meet the quality threshold.
375    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
376        self.decoder.set_tokens(tokens).await;
377        self
378    }
379
380    /// Skips decoding errors for component state updates.
381    ///
382    /// Allows the stream to continue processing even if some states fail to decode,
383    /// logging a warning instead of panicking.
384    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
385        self.decoder
386            .skip_state_decode_failures(skip);
387        self
388    }
389
390    /// Configures the retry policy for websocket reconnects.
391    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
392        self.stream_builder = self
393            .stream_builder
394            .websockets_retry_config(config);
395        self
396    }
397
398    /// Configures the retry policy for state synchronization.
399    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
400        self.stream_builder = self
401            .stream_builder
402            .state_synchronizer_retry_config(config);
403        self
404    }
405
406    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
407        &self.decoder
408    }
409
410    /// Builds and returns the configured protocol stream.
411    ///
412    /// See the module-level docs for details on stream behavior and emitted messages.
413    /// This method applies all builder settings and starts the stream.
414    pub async fn build(
415        self,
416    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
417        initialize_hook_handlers().map_err(|e| {
418            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
419        })?;
420        let (_, rx) = self.stream_builder.build().await?;
421        let decoder = Arc::new(self.decoder);
422
423        let stream = Box::pin(
424            ReceiverStream::new(rx)
425                .take_while(move |msg| match msg {
426                    Ok(msg) => {
427                        let states = msg.sync_states.values();
428                        if self
429                            .stream_end_policy
430                            .should_end(states)
431                        {
432                            error!(
433                                "Block stream ended due to {:?}: {:?}",
434                                self.stream_end_policy, msg.sync_states
435                            );
436                            futures::future::ready(false)
437                        } else {
438                            futures::future::ready(true)
439                        }
440                    }
441                    Err(e) => {
442                        error!("Block stream ended with terminal error: {e}");
443                        futures::future::ready(false)
444                    }
445                })
446                .then({
447                    let decoder = decoder.clone(); // Clone the decoder for the closure
448                    move |msg| {
449                        let decoder = decoder.clone(); // Clone again for the async block
450                        async move {
451                            let msg = msg.expect("Save since stream ends if we receive an error");
452                            decoder.decode(&msg).await.map_err(|e| {
453                                debug!(msg=?msg, "Decode error: {}", e);
454                                e
455                            })
456                        }
457                    }
458                }),
459        );
460        Ok(stream)
461    }
462}