tycho_simulation/evm/stream.rs
1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//! set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//! use std::collections::HashSet;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//! let all_tokens = load_all_tokens(
74//! "tycho-beta.propellerheads.xyz",
75//! false,
76//! Some("sampletoken"),
77//! true,
78//! Chain::Ethereum,
79//! None,
80//! None,
81//! )
82//! .await
83//! .expect("Failed loading tokens");
84//!
85//! let mut protocol_stream =
86//! ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
87//! .auth_key(Some("sampletoken".to_string()))
88//! .skip_state_decode_failures(true)
89//! .exchange::<UniswapV2State>(
90//! "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
91//! )
92//! .blocklist_components(HashSet::new())
93//! .set_tokens(all_tokens)
94//! .await
95//! .build()
96//! .await
97//! .expect("Failed building protocol stream");
98//!
99//! // Loop through block updates
100//! while let Some(msg) = protocol_stream.next().await {
101//! dbg!(msg).expect("failed decoding");
102//! }
103//! }
104//! ```
105use std::{
106 collections::{HashMap, HashSet},
107 sync::Arc,
108 time,
109};
110
111use futures::{Stream, StreamExt};
112use tokio_stream::wrappers::ReceiverStream;
113use tracing::{debug, error, warn};
114use tycho_client::{
115 feed::{
116 component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
117 FeedMessage, SynchronizerState,
118 },
119 stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
120};
121use tycho_common::{
122 models::{token::Token, Chain},
123 simulation::protocol_sim::ProtocolSim,
124 traits::TxDeltaIndexer,
125 Bytes,
126};
127
128use crate::{
129 evm::{
130 decoder::{StreamDecodeError, TychoStreamDecoder},
131 pending::PendingBlockProcessor,
132 protocol::uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
133 },
134 protocol::{
135 errors::InvalidSnapshotError,
136 models::{DecoderContext, TryFromWithBlock, Update},
137 },
138};
139
140const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
141
142#[derive(Default, Debug, Clone, Copy)]
143pub enum StreamEndPolicy {
144 /// End stream if all states are Stale or Ended (default)
145 #[default]
146 AllEndedOrStale,
147 /// End stream if any protocol ended
148 AnyEnded,
149 /// End stream if any protocol ended or is stale
150 AnyEndedOrStale,
151 /// End stream if any protocol is stale
152 AnyStale,
153}
154
155impl StreamEndPolicy {
156 fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
157 let mut it = states.into_iter();
158 match self {
159 StreamEndPolicy::AllEndedOrStale => false,
160 StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
161 StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
162 StreamEndPolicy::AnyEndedOrStale => {
163 it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
164 }
165 }
166 }
167}
168
169/// Builds and configures the multi protocol stream described in the [module-level docs](self).
170///
171/// See the module documentation for details on protocols, configuration options, and
172/// stream behavior.
173pub struct ProtocolStreamBuilder {
174 decoder: TychoStreamDecoder<BlockHeader>,
175 stream_builder: TychoStreamBuilder,
176 stream_end_policy: StreamEndPolicy,
177 chain: Chain,
178 pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
179}
180
181impl ProtocolStreamBuilder {
182 /// Creates a new builder for a multi-protocol stream.
183 ///
184 /// See the [module-level docs](self) for full details on stream behavior and configuration.
185 pub fn new(tycho_url: &str, chain: Chain) -> Self {
186 Self {
187 decoder: TychoStreamDecoder::new(),
188 stream_builder: TychoStreamBuilder::new(tycho_url, chain),
189 stream_end_policy: StreamEndPolicy::default(),
190 chain,
191 pending_indexers: HashMap::new(),
192 }
193 }
194
195 /// Adds a specific exchange to the stream.
196 ///
197 /// This configures the builder to include a new protocol synchronizer for `name`,
198 /// filtering its components according to `filter` and optionally `filter_fn`.
199 ///
200 /// The type parameter `T` specifies the decoder type for this exchange. All
201 /// component states for this exchange will be decoded into instances of `T`.
202 ///
203 /// # Parameters
204 ///
205 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
206 /// - `filter`: Defines the set of components to include in the stream.
207 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
208 /// expressible in `filter`.
209 ///
210 /// # Notes
211 ///
212 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
213 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
214 /// filter function is required to ensure correct decoding and quoting logic.
215 pub fn exchange<T>(
216 mut self,
217 name: &str,
218 filter: ComponentFilter,
219 filter_fn: Option<fn(&ComponentWithState) -> bool>,
220 ) -> Self
221 where
222 T: ProtocolSim
223 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
224 + Send
225 + 'static,
226 {
227 self.stream_builder = self
228 .stream_builder
229 .exchange(name, filter);
230 self.decoder.register_decoder::<T>(name);
231 if let Some(predicate) = filter_fn {
232 self.decoder
233 .register_filter(name, predicate);
234 }
235
236 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
237 warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
238 }
239
240 self
241 }
242
243 /// Adds a specific exchange to the stream with decoder context.
244 ///
245 /// This configures the builder to include a new protocol synchronizer for `name`,
246 /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
247 /// the DecoderContext (this is useful to test protocols that are not live yet)
248 ///
249 /// The type parameter `T` specifies the decoder type for this exchange. All
250 /// component states for this exchange will be decoded into instances of `T`.
251 ///
252 /// # Parameters
253 ///
254 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
255 /// - `filter`: Defines the set of components to include in the stream.
256 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
257 /// expressible in `filter`.
258 /// - `decoder_context`: The decoder context for this exchange
259 ///
260 /// # Notes
261 ///
262 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
263 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
264 /// filter function is required to ensure correct decoding and quoting logic.
265 pub fn exchange_with_decoder_context<T>(
266 mut self,
267 name: &str,
268 filter: ComponentFilter,
269 filter_fn: Option<fn(&ComponentWithState) -> bool>,
270 decoder_context: DecoderContext,
271 ) -> Self
272 where
273 T: ProtocolSim
274 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
275 + Send
276 + 'static,
277 {
278 self.stream_builder = self
279 .stream_builder
280 .exchange(name, filter);
281 self.decoder
282 .register_decoder_with_context::<T>(name, decoder_context);
283 if let Some(predicate) = filter_fn {
284 self.decoder
285 .register_filter(name, predicate);
286 }
287
288 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
289 warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
290 }
291
292 self
293 }
294
295 /// Sets the block time interval for the stream.
296 ///
297 /// This controls how often the stream produces updates.
298 pub fn block_time(mut self, block_time: u64) -> Self {
299 self.stream_builder = self
300 .stream_builder
301 .block_time(block_time);
302 self
303 }
304
305 /// Sets the network operation timeout (deprecated).
306 ///
307 /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
308 /// This method is retained for backwards compatibility.
309 #[deprecated = "Use latency_buffer instead"]
310 pub fn timeout(mut self, timeout: u64) -> Self {
311 self.stream_builder = self.stream_builder.timeout(timeout);
312 self
313 }
314
315 /// Sets the latency buffer to aggregate same-block messages.
316 ///
317 /// This allows the supervisor to wait a short interval for all synchronizers to emit
318 /// before aggregating.
319 pub fn latency_buffer(mut self, timeout: u64) -> Self {
320 self.stream_builder = self.stream_builder.timeout(timeout);
321 self
322 }
323
324 /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
325 pub fn max_missed_blocks(mut self, n: u64) -> Self {
326 self.stream_builder = self.stream_builder.max_missed_blocks(n);
327 self
328 }
329
330 /// Sets how long a synchronizer may take to process the initial message.
331 ///
332 /// Useful for data-intensive protocols where startup decoding takes longer.
333 pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
334 self.stream_builder = self
335 .stream_builder
336 .startup_timeout(timeout);
337 self
338 }
339
340 /// Configures the stream to exclude state updates.
341 ///
342 /// This reduces bandwidth and decoding workload if protocol state is not of
343 /// interest (e.g. only process new tokens).
344 pub fn no_state(mut self, no_state: bool) -> Self {
345 self.stream_builder = self.stream_builder.no_state(no_state);
346 self
347 }
348
349 /// Sets the API key for authenticating with the Tycho server.
350 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
351 self.stream_builder = self.stream_builder.auth_key(auth_key);
352 self
353 }
354
355 /// Disables TLS/ SSL for the connection, using http and ws protocols.
356 ///
357 /// This is not recommended for production use.
358 pub fn no_tls(mut self, no_tls: bool) -> Self {
359 self.stream_builder = self.stream_builder.no_tls(no_tls);
360 self
361 }
362
363 /// Disable compression for the connection.
364 pub fn disable_compression(mut self) -> Self {
365 self.stream_builder = self
366 .stream_builder
367 .disable_compression();
368 self
369 }
370
371 /// Enables partial block updates (flashblocks).
372 pub fn enable_partial_blocks(mut self) -> Self {
373 self.stream_builder = self
374 .stream_builder
375 .enable_partial_blocks();
376 self
377 }
378
379 /// Exclude specific component IDs from all registered exchanges.
380 pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
381 if !ids.is_empty() {
382 tracing::info!("Blocklisting {} components", ids.len());
383 self.stream_builder = self.stream_builder.blocklisted_ids(ids);
384 }
385 self
386 }
387
388 /// Sets the stream end policy.
389 ///
390 /// Controls when the stream should stop based on synchronizer states.
391 ///
392 /// ## Note
393 /// The stream always ends latest if all protocols are stale or ended independent of
394 /// this configuration. This allows you to end the stream earlier than that.
395 ///
396 /// See [self::StreamEndPolicy] for possible configuration options.
397 pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
398 self.stream_end_policy = stream_end_policy;
399 self
400 }
401
402 /// Provides token metadata used to decode startup snapshots and initialize protocol states.
403 ///
404 /// This is not a stream filter — components arriving after startup include their own token
405 /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
406 /// arriving via stream deltas are added automatically if they meet the quality threshold.
407 pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
408 self.decoder.set_tokens(tokens).await;
409 self
410 }
411
412 /// Skips decoding errors for component state updates.
413 ///
414 /// Allows the stream to continue processing even if some states fail to decode,
415 /// logging a warning instead of panicking.
416 pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
417 self.decoder
418 .skip_state_decode_failures(skip);
419 self
420 }
421
422 /// Sets the minimum token quality for tokens added via the stream.
423 ///
424 /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
425 /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
426 /// apply consistent filtering.
427 pub fn min_token_quality(mut self, quality: u32) -> Self {
428 self.decoder.min_token_quality(quality);
429 self
430 }
431
432 /// Configures the retry policy for websocket reconnects.
433 pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
434 self.stream_builder = self
435 .stream_builder
436 .websockets_retry_config(config);
437 self
438 }
439
440 /// Configures the retry policy for state synchronization.
441 pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
442 self.stream_builder = self
443 .stream_builder
444 .state_synchronizer_retry_config(config);
445 self
446 }
447
448 pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
449 &self.decoder
450 }
451
452 /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
453 ///
454 /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
455 /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
456 /// the confirmed stream and the pending processor.
457 ///
458 /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
459 /// `update_engine()` and cannot be simulated natively.
460 pub fn with_pending_indexer(
461 mut self,
462 extractor: &str,
463 indexer: Box<dyn TxDeltaIndexer>,
464 ) -> Result<Self, StreamError> {
465 if extractor.starts_with("vm:") {
466 return Err(StreamError::SetUpError(format!(
467 "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
468 )));
469 }
470 self.pending_indexers
471 .insert(extractor.to_string(), indexer);
472 Ok(self)
473 }
474
475 /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
476 /// in sync with it automatically.
477 ///
478 /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
479 /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
480 /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
481 /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
482 /// nothing for single-threaded use).
483 ///
484 /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
485 /// simulate a candidate bundle; it drains the channel automatically before computing.
486 pub async fn build_with_pending(
487 self,
488 ) -> Result<
489 (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
490 StreamError,
491 > {
492 initialize_hook_handlers().map_err(|e| {
493 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
494 })?;
495 let (_, rx) = self.stream_builder.build().await?;
496 let decoder = Arc::new(self.decoder);
497
498 let (advance_tx, advance_rx) =
499 tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
500 let pending = PendingBlockProcessor::new(
501 self.pending_indexers,
502 decoder.clone(),
503 self.chain,
504 advance_rx,
505 );
506
507 let stream_end_policy = self.stream_end_policy;
508 let stream = Box::pin(
509 ReceiverStream::new(rx)
510 .take_while(move |msg| match msg {
511 Ok(msg) => {
512 let states = msg.sync_states.values();
513 if stream_end_policy.should_end(states) {
514 error!(
515 "Block stream ended due to {:?}: {:?}",
516 stream_end_policy, msg.sync_states
517 );
518 futures::future::ready(false)
519 } else {
520 futures::future::ready(true)
521 }
522 }
523 Err(e) => {
524 error!("Block stream ended with terminal error: {e}");
525 futures::future::ready(false)
526 }
527 })
528 .then({
529 let decoder = decoder.clone();
530 move |msg| {
531 let decoder = decoder.clone();
532 let advance_tx = advance_tx.clone();
533 async move {
534 let msg = msg.expect("Safe since stream ends if we receive an error");
535 // Non-blocking: if the receiver is gone we just skip the send.
536 let _ = advance_tx.send(msg.clone());
537 decoder.decode(&msg).await.map_err(|e| {
538 debug!(msg=?msg, "Decode error: {}", e);
539 e
540 })
541 }
542 }
543 }),
544 );
545 Ok((stream, pending))
546 }
547
548 /// Builds and returns the configured protocol stream.
549 ///
550 /// See the module-level docs for details on stream behavior and emitted messages.
551 /// This method applies all builder settings and starts the stream.
552 pub async fn build(
553 self,
554 ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
555 initialize_hook_handlers().map_err(|e| {
556 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
557 })?;
558 let (_, rx) = self.stream_builder.build().await?;
559 let decoder = Arc::new(self.decoder);
560
561 let stream = Box::pin(
562 ReceiverStream::new(rx)
563 .take_while(move |msg| match msg {
564 Ok(msg) => {
565 let states = msg.sync_states.values();
566 if self
567 .stream_end_policy
568 .should_end(states)
569 {
570 error!(
571 "Block stream ended due to {:?}: {:?}",
572 self.stream_end_policy, msg.sync_states
573 );
574 futures::future::ready(false)
575 } else {
576 futures::future::ready(true)
577 }
578 }
579 Err(e) => {
580 error!("Block stream ended with terminal error: {e}");
581 futures::future::ready(false)
582 }
583 })
584 .then({
585 let decoder = decoder.clone(); // Clone the decoder for the closure
586 move |msg| {
587 let decoder = decoder.clone(); // Clone again for the async block
588 async move {
589 let msg = msg.expect("Save since stream ends if we receive an error");
590 decoder.decode(&msg).await.map_err(|e| {
591 debug!(msg=?msg, "Decode error: {}", e);
592 e
593 })
594 }
595 }
596 }),
597 );
598 Ok(stream)
599 }
600}