tycho_simulation/evm/stream.rs
1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! [`protocol::models::Update`] messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps [`tycho_client::stream::TychoStream`]. It decodes `FeedMessage`s
11//! into [`protocol::models::Update`]s. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//! set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide an initial set of tokens of interest. The first message includes only
43//! components whose tokens match this set. The stream adds new tokens automatically
44//! when a component is deployed and its quality exceeds `min_token_quality`.
45//!
46//! ### StreamEndPolicy
47//! Control when the stream ends based on worker states. By default, it ends when all
48//! workers are `Stale` or `Ended`.
49//!
50//! ## Stream
51//! The stream emits one [`protocol::models::Update`] every `block_time`. Each update
52//! reports protocol synchronization states and any changes.
53//!
54//! The `new_components` field lists newly deployed components and their tokens.
55//!
56//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
57//! most errors, so users should rarely need to restart it manually.
58//!
59//! ## Example
60//! ```no_run
61//! use tycho_common::models::Chain;
62//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
63//! use tycho_simulation::utils::load_all_tokens;
64//! use futures::StreamExt;
65//! use tycho_client::feed::component_tracker::ComponentFilter;
66//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
67//!
68//! #[tokio::main]
69//! async fn main() {
70//! let all_tokens = load_all_tokens(
71//! "tycho-beta.propellerheads.xyz",
72//! false,
73//! Some("sampletoken"),
74//! true,
75//! Chain::Ethereum,
76//! None,
77//! None,
78//! )
79//! .await
80//! .expect("Failed loading tokens");
81//!
82//! let mut protocol_stream =
83//! ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
84//! .auth_key(Some("sampletoken".to_string()))
85//! .skip_state_decode_failures(true)
86//! .exchange::<UniswapV2State>(
87//! "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
88//! )
89//! .blocklist_components("blocklist.toml")
90//! .set_tokens(all_tokens)
91//! .await
92//! .build()
93//! .await
94//! .expect("Failed building protocol stream");
95//!
96//! // Loop through block updates
97//! while let Some(msg) = protocol_stream.next().await {
98//! dbg!(msg).expect("failed decoding");
99//! }
100//! }
101//! ```
102use std::{collections::HashMap, path::Path, sync::Arc, time};
103
104use futures::{Stream, StreamExt};
105use tokio_stream::wrappers::ReceiverStream;
106use tracing::{debug, error, warn};
107use tycho_client::{
108 feed::{
109 component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
110 SynchronizerState,
111 },
112 stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
113};
114use tycho_common::{
115 models::{token::Token, Chain},
116 simulation::protocol_sim::ProtocolSim,
117 Bytes,
118};
119
120use crate::{
121 evm::{
122 decoder::{StreamDecodeError, TychoStreamDecoder},
123 protocol::uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
124 },
125 protocol::{
126 errors::InvalidSnapshotError,
127 models::{DecoderContext, TryFromWithBlock, Update},
128 },
129};
130
131const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
132
133#[derive(Default, Debug, Clone, Copy)]
134pub enum StreamEndPolicy {
135 /// End stream if all states are Stale or Ended (default)
136 #[default]
137 AllEndedOrStale,
138 /// End stream if any protocol ended
139 AnyEnded,
140 /// End stream if any protocol ended or is stale
141 AnyEndedOrStale,
142 /// End stream if any protocol is stale
143 AnyStale,
144}
145
146impl StreamEndPolicy {
147 fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
148 let mut it = states.into_iter();
149 match self {
150 StreamEndPolicy::AllEndedOrStale => false,
151 StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
152 StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
153 StreamEndPolicy::AnyEndedOrStale => {
154 it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
155 }
156 }
157 }
158}
159
160/// Builds and configures the multi protocol stream described in the [module-level docs](self).
161///
162/// See the module documentation for details on protocols, configuration options, and
163/// stream behavior.
164pub struct ProtocolStreamBuilder {
165 decoder: TychoStreamDecoder<BlockHeader>,
166 stream_builder: TychoStreamBuilder,
167 stream_end_policy: StreamEndPolicy,
168}
169
170impl ProtocolStreamBuilder {
171 /// Creates a new builder for a multi-protocol stream.
172 ///
173 /// See the [module-level docs](self) for full details on stream behavior and configuration.
174 pub fn new(tycho_url: &str, chain: Chain) -> Self {
175 Self {
176 decoder: TychoStreamDecoder::new(),
177 stream_builder: TychoStreamBuilder::new(tycho_url, chain.into()),
178 stream_end_policy: StreamEndPolicy::default(),
179 }
180 }
181
182 /// Adds a specific exchange to the stream.
183 ///
184 /// This configures the builder to include a new protocol synchronizer for `name`,
185 /// filtering its components according to `filter` and optionally `filter_fn`.
186 ///
187 /// The type parameter `T` specifies the decoder type for this exchange. All
188 /// component states for this exchange will be decoded into instances of `T`.
189 ///
190 /// # Parameters
191 ///
192 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
193 /// - `filter`: Defines the set of components to include in the stream.
194 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
195 /// expressible in `filter`.
196 ///
197 /// # Notes
198 ///
199 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
200 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
201 /// filter function is required to ensure correct decoding and quoting logic.
202 pub fn exchange<T>(
203 mut self,
204 name: &str,
205 filter: ComponentFilter,
206 filter_fn: Option<fn(&ComponentWithState) -> bool>,
207 ) -> Self
208 where
209 T: ProtocolSim
210 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
211 + Send
212 + 'static,
213 {
214 self.stream_builder = self
215 .stream_builder
216 .exchange(name, filter);
217 self.decoder.register_decoder::<T>(name);
218 if let Some(predicate) = filter_fn {
219 self.decoder
220 .register_filter(name, predicate);
221 }
222
223 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
224 warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
225 }
226
227 self
228 }
229
230 /// Adds a specific exchange to the stream with decoder context.
231 ///
232 /// This configures the builder to include a new protocol synchronizer for `name`,
233 /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
234 /// the DecoderContext (this is useful to test protocols that are not live yet)
235 ///
236 /// The type parameter `T` specifies the decoder type for this exchange. All
237 /// component states for this exchange will be decoded into instances of `T`.
238 ///
239 /// # Parameters
240 ///
241 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
242 /// - `filter`: Defines the set of components to include in the stream.
243 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
244 /// expressible in `filter`.
245 /// - `decoder_context`: The decoder context for this exchange
246 ///
247 /// # Notes
248 ///
249 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
250 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
251 /// filter function is required to ensure correct decoding and quoting logic.
252 pub fn exchange_with_decoder_context<T>(
253 mut self,
254 name: &str,
255 filter: ComponentFilter,
256 filter_fn: Option<fn(&ComponentWithState) -> bool>,
257 decoder_context: DecoderContext,
258 ) -> Self
259 where
260 T: ProtocolSim
261 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
262 + Send
263 + 'static,
264 {
265 self.stream_builder = self
266 .stream_builder
267 .exchange(name, filter);
268 self.decoder
269 .register_decoder_with_context::<T>(name, decoder_context);
270 if let Some(predicate) = filter_fn {
271 self.decoder
272 .register_filter(name, predicate);
273 }
274
275 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
276 warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
277 }
278
279 self
280 }
281
282 /// Sets the block time interval for the stream.
283 ///
284 /// This controls how often the stream produces updates.
285 pub fn block_time(mut self, block_time: u64) -> Self {
286 self.stream_builder = self
287 .stream_builder
288 .block_time(block_time);
289 self
290 }
291
292 /// Sets the network operation timeout (deprecated).
293 ///
294 /// Use [`latency_buffer`] instead for controlling latency.
295 /// This method is retained for backwards compatibility.
296 #[deprecated = "Use latency_buffer instead"]
297 pub fn timeout(mut self, timeout: u64) -> Self {
298 self.stream_builder = self.stream_builder.timeout(timeout);
299 self
300 }
301
302 /// Sets the latency buffer to aggregate same-block messages.
303 ///
304 /// This allows the supervisor to wait a short interval for all synchronizers to emit
305 /// before aggregating.
306 pub fn latency_buffer(mut self, timeout: u64) -> Self {
307 self.stream_builder = self.stream_builder.timeout(timeout);
308 self
309 }
310
311 /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
312 pub fn max_missed_blocks(mut self, n: u64) -> Self {
313 self.stream_builder = self.stream_builder.max_missed_blocks(n);
314 self
315 }
316
317 /// Sets how long a synchronizer may take to process the initial message.
318 ///
319 /// Useful for data-intensive protocols where startup decoding takes longer.
320 pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
321 self.stream_builder = self
322 .stream_builder
323 .startup_timeout(timeout);
324 self
325 }
326
327 /// Configures the stream to exclude state updates.
328 ///
329 /// This reduces bandwidth and decoding workload if protocol state is not of
330 /// interest (e.g. only process new tokens).
331 pub fn no_state(mut self, no_state: bool) -> Self {
332 self.stream_builder = self.stream_builder.no_state(no_state);
333 self
334 }
335
336 /// Sets the API key for authenticating with the Tycho server.
337 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
338 self.stream_builder = self.stream_builder.auth_key(auth_key);
339 self
340 }
341
342 /// Disables TLS/ SSL for the connection, using http and ws protocols.
343 ///
344 /// This is not recommended for production use.
345 pub fn no_tls(mut self, no_tls: bool) -> Self {
346 self.stream_builder = self.stream_builder.no_tls(no_tls);
347 self
348 }
349
350 /// Disable compression for the connection.
351 pub fn disable_compression(mut self) -> Self {
352 self.stream_builder = self
353 .stream_builder
354 .disable_compression();
355 self
356 }
357
358 /// Enables partial block updates (flashblocks).
359 pub fn enable_partial_blocks(mut self) -> Self {
360 self.stream_builder = self
361 .stream_builder
362 .enable_partial_blocks();
363 self
364 }
365
366 /// Exclude specific component IDs from all registered exchanges.
367 ///
368 /// Loads blocklisted component IDs from a TOML file at `path`.
369 /// If the file doesn't exist or can't be parsed, no components are
370 /// excluded and a warning is logged.
371 pub fn blocklist_components(mut self, path: impl AsRef<Path>) -> Self {
372 let ids = crate::utils::load_blocklist(path.as_ref());
373 if !ids.is_empty() {
374 tracing::info!("Blocklisting {} components", ids.len());
375 self.stream_builder = self.stream_builder.blocklisted_ids(ids);
376 }
377 self
378 }
379
380 /// Sets the stream end policy.
381 ///
382 /// Controls when the stream should stop based on synchronizer states.
383 ///
384 /// ## Note
385 /// The stream always ends latest if all protocols are stale or ended independent of
386 /// this configuration. This allows you to end the stream earlier than that.
387 ///
388 /// See [self::StreamEndPolicy] for possible configuration options.
389 pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
390 self.stream_end_policy = stream_end_policy;
391 self
392 }
393
394 /// Sets the initial tokens to consider during decoding.
395 ///
396 /// Only components containing these tokens will be decoded initially.
397 /// New tokens may be added automatically if they meet the quality threshold.
398 pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
399 self.decoder.set_tokens(tokens).await;
400 self
401 }
402
403 /// Skips decoding errors for component state updates.
404 ///
405 /// Allows the stream to continue processing even if some states fail to decode,
406 /// logging a warning instead of panicking.
407 pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
408 self.decoder
409 .skip_state_decode_failures(skip);
410 self
411 }
412
413 /// Configures the retry policy for websocket reconnects.
414 pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
415 self.stream_builder = self
416 .stream_builder
417 .websockets_retry_config(config);
418 self
419 }
420
421 /// Configures the retry policy for state synchronization.
422 pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
423 self.stream_builder = self
424 .stream_builder
425 .state_synchronizer_retry_config(config);
426 self
427 }
428
429 pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
430 &self.decoder
431 }
432
433 /// Builds and returns the configured protocol stream.
434 ///
435 /// See the module-level docs for details on stream behavior and emitted messages.
436 /// This method applies all builder settings and starts the stream.
437 pub async fn build(
438 self,
439 ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
440 initialize_hook_handlers().map_err(|e| {
441 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
442 })?;
443 let (_, rx) = self.stream_builder.build().await?;
444 let decoder = Arc::new(self.decoder);
445
446 let stream = Box::pin(
447 ReceiverStream::new(rx)
448 .take_while(move |msg| match msg {
449 Ok(msg) => {
450 let states = msg.sync_states.values();
451 if self
452 .stream_end_policy
453 .should_end(states)
454 {
455 error!(
456 "Block stream ended due to {:?}: {:?}",
457 self.stream_end_policy, msg.sync_states
458 );
459 futures::future::ready(false)
460 } else {
461 futures::future::ready(true)
462 }
463 }
464 Err(e) => {
465 error!("Block stream ended with terminal error: {e}");
466 futures::future::ready(false)
467 }
468 })
469 .then({
470 let decoder = decoder.clone(); // Clone the decoder for the closure
471 move |msg| {
472 let decoder = decoder.clone(); // Clone again for the async block
473 async move {
474 let msg = msg.expect("Save since stream ends if we receive an error");
475 decoder.decode(&msg).await.map_err(|e| {
476 debug!(msg=?msg, "Decode error: {}", e);
477 e
478 })
479 }
480 }
481 }),
482 );
483 Ok(stream)
484 }
485}