tycho_simulation/evm/stream.rs
1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//! set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//! use std::collections::HashSet;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//! let all_tokens = load_all_tokens(
74//! "tycho-beta.propellerheads.xyz",
75//! false,
76//! Some("sampletoken"),
77//! true,
78//! Chain::Ethereum,
79//! None,
80//! None,
81//! )
82//! .await
83//! .expect("Failed loading tokens");
84//!
85//! let protocol_stream =
86//! ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
87//! .auth_key(Some("sampletoken".to_string()))
88//! .skip_state_decode_failures(true)
89//! .exchange::<UniswapV2State>(
90//! "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
91//! )
92//! .blocklist_components(HashSet::new())
93//! .set_tokens(all_tokens)
94//! .await
95//! .build()
96//! .await
97//! .expect("Failed building protocol stream");
98//! tokio::pin!(protocol_stream);
99//!
100//! // Loop through block updates
101//! while let Some(msg) = protocol_stream.next().await {
102//! dbg!(msg).expect("failed decoding");
103//! }
104//! }
105//! ```
106use std::{
107 collections::{HashMap, HashSet},
108 sync::Arc,
109 time,
110};
111
112use futures::{future::Either, stream, Stream, StreamExt};
113use tokio_stream::wrappers::ReceiverStream;
114use tracing::{debug, error, warn};
115use tycho_client::{
116 feed::{
117 component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
118 BlockSynchronizerError, FeedMessage, SynchronizerState,
119 },
120 stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
121};
122use tycho_common::{
123 models::{token::Token, Chain},
124 simulation::protocol_sim::ProtocolSim,
125 traits::TxDeltaIndexer,
126 Bytes,
127};
128
129use crate::{
130 evm::{
131 decoder::{StreamDecodeError, TychoStreamDecoder},
132 pending::PendingBlockProcessor,
133 protocol::{
134 native_wrapper::state::NativeWrapperState,
135 uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
136 },
137 },
138 protocol::{
139 errors::InvalidSnapshotError,
140 models::{DecoderContext, TryFromWithBlock, Update},
141 },
142};
143
144const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
145
146#[derive(Default, Debug, Clone, Copy)]
147pub enum StreamEndPolicy {
148 /// End stream if all states are Stale or Ended (default)
149 #[default]
150 AllEndedOrStale,
151 /// End stream if any protocol ended
152 AnyEnded,
153 /// End stream if any protocol ended or is stale
154 AnyEndedOrStale,
155 /// End stream if any protocol is stale
156 AnyStale,
157}
158
159impl StreamEndPolicy {
160 fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
161 let mut it = states.into_iter();
162 match self {
163 StreamEndPolicy::AllEndedOrStale => false,
164 StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
165 StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
166 StreamEndPolicy::AnyEndedOrStale => {
167 it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
168 }
169 }
170 }
171}
172
173/// Handle returned by [`ProtocolStreamBuilder::with_step_controller`] that gives external
174/// control over when each buffered block is released for decoding.
175///
176/// Intended for complex test scenarios where the caller needs to observe what the next
177/// block contains before allowing the decoder pipeline to process it.
178///
179/// ## Drop behaviour
180///
181/// Dropping this controller ungates the stream: the gating task detects the closed trigger
182/// channel, forwards the currently-buffered block (if any), then continues passing subsequent
183/// blocks through without waiting for triggers — exactly as if step-control had never been
184/// enabled. The stream runs to its natural end.
185pub struct BlockStepController {
186 /// Sends a trigger signal to release the next buffered block.
187 trigger_tx: tokio::sync::mpsc::UnboundedSender<()>,
188 /// Watch channel containing the next buffered raw message, or `None` if no block is pending.
189 peek_rx: tokio::sync::watch::Receiver<Option<FeedMessage<BlockHeader>>>,
190}
191
192impl BlockStepController {
193 /// Releases the next buffered block for decoding and emission.
194 ///
195 /// Returns an error if the stream has already ended and the sender is disconnected.
196 pub fn trigger_next_block(&self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
197 // Send a unit value on the trigger channel to unblock the gating task.
198 self.trigger_tx.send(())
199 }
200
201 /// Returns the currently buffered block immediately, or `None` if no block is buffered yet.
202 pub fn try_peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
203 self.peek_rx.borrow().clone()
204 }
205
206 /// Waits until a block is buffered and returns it without consuming it.
207 ///
208 /// Returns `None` only if the stream has ended and no further blocks will arrive.
209 /// If a block is already buffered when this is called, it returns immediately.
210 pub async fn peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
211 // Clone so we don't hold a mutable borrow on self; wait_for checks the current
212 // value first, so this returns immediately if a block is already present.
213 let mut rx = self.peek_rx.clone();
214 let guard = rx
215 .wait_for(|v| v.is_some())
216 .await
217 .ok()?;
218 guard.clone()
219 }
220}
221
222/// Builds and configures the multi protocol stream described in the [module-level docs](self).
223///
224/// See the module documentation for details on protocols, configuration options, and
225/// stream behavior.
226pub struct ProtocolStreamBuilder {
227 decoder: TychoStreamDecoder<BlockHeader>,
228 stream_builder: TychoStreamBuilder,
229 stream_end_policy: StreamEndPolicy,
230 chain: Chain,
231 pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
232 /// Watch sender used to publish the currently-buffered raw block so the controller can peek
233 /// at it before triggering. `Some` iff step-control mode is active.
234 step_peek_tx: Option<tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>>,
235 /// Receiver half of the trigger channel. Held here until `build()` / `build_with_pending()`
236 /// transfers ownership to the gating task. `Some` iff step-control mode is active.
237 step_trigger_rx: Option<tokio::sync::mpsc::UnboundedReceiver<()>>,
238}
239
240impl ProtocolStreamBuilder {
241 /// Creates a new builder for a multi-protocol stream.
242 ///
243 /// See the [module-level docs](self) for full details on stream behavior and configuration.
244 pub fn new(tycho_url: &str, chain: Chain) -> Self {
245 Self {
246 decoder: TychoStreamDecoder::new(),
247 stream_builder: TychoStreamBuilder::new(tycho_url, chain),
248 stream_end_policy: StreamEndPolicy::default(),
249 chain,
250 pending_indexers: HashMap::new(),
251 step_peek_tx: None,
252 step_trigger_rx: None,
253 }
254 }
255
256 /// Adds a specific exchange to the stream.
257 ///
258 /// This configures the builder to include a new protocol synchronizer for `name`,
259 /// filtering its components according to `filter` and optionally `filter_fn`.
260 ///
261 /// The type parameter `T` specifies the decoder type for this exchange. All
262 /// component states for this exchange will be decoded into instances of `T`.
263 ///
264 /// # Parameters
265 ///
266 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
267 /// - `filter`: Defines the set of components to include in the stream.
268 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
269 /// expressible in `filter`.
270 ///
271 /// # Notes
272 ///
273 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
274 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
275 /// filter function is required to ensure correct decoding and quoting logic.
276 pub fn exchange<T>(
277 mut self,
278 name: &str,
279 filter: ComponentFilter,
280 filter_fn: Option<fn(&ComponentWithState) -> bool>,
281 ) -> Self
282 where
283 T: ProtocolSim
284 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
285 + Send
286 + 'static,
287 {
288 self.stream_builder = self
289 .stream_builder
290 .exchange(name, filter);
291 self.decoder.register_decoder::<T>(name);
292 if let Some(predicate) = filter_fn {
293 self.decoder
294 .register_filter(name, predicate);
295 }
296
297 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
298 warn!(
299 "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
300 name
301 );
302 }
303
304 self
305 }
306
307 /// Adds a specific exchange to the stream with decoder context.
308 ///
309 /// This configures the builder to include a new protocol synchronizer for `name`,
310 /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
311 /// the DecoderContext (this is useful to test protocols that are not live yet)
312 ///
313 /// The type parameter `T` specifies the decoder type for this exchange. All
314 /// component states for this exchange will be decoded into instances of `T`.
315 ///
316 /// # Parameters
317 ///
318 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
319 /// - `filter`: Defines the set of components to include in the stream.
320 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
321 /// expressible in `filter`.
322 /// - `decoder_context`: The decoder context for this exchange
323 ///
324 /// # Notes
325 ///
326 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
327 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
328 /// filter function is required to ensure correct decoding and quoting logic.
329 pub fn exchange_with_decoder_context<T>(
330 mut self,
331 name: &str,
332 filter: ComponentFilter,
333 filter_fn: Option<fn(&ComponentWithState) -> bool>,
334 decoder_context: DecoderContext,
335 ) -> Self
336 where
337 T: ProtocolSim
338 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
339 + Send
340 + 'static,
341 {
342 self.stream_builder = self
343 .stream_builder
344 .exchange(name, filter);
345 self.decoder
346 .register_decoder_with_context::<T>(name, decoder_context);
347 if let Some(predicate) = filter_fn {
348 self.decoder
349 .register_filter(name, predicate);
350 }
351
352 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
353 warn!(
354 "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
355 name
356 );
357 }
358
359 self
360 }
361
362 /// Sets the block time interval for the stream.
363 ///
364 /// This controls how often the stream produces updates.
365 pub fn block_time(mut self, block_time: u64) -> Self {
366 self.stream_builder = self
367 .stream_builder
368 .block_time(block_time);
369 self
370 }
371
372 /// Sets the network operation timeout (deprecated).
373 ///
374 /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
375 /// This method is retained for backwards compatibility.
376 #[deprecated = "Use latency_buffer instead"]
377 pub fn timeout(mut self, timeout: u64) -> Self {
378 self.stream_builder = self.stream_builder.timeout(timeout);
379 self
380 }
381
382 /// Sets the latency buffer to aggregate same-block messages.
383 ///
384 /// This allows the supervisor to wait a short interval for all synchronizers to emit
385 /// before aggregating.
386 pub fn latency_buffer(mut self, timeout: u64) -> Self {
387 self.stream_builder = self.stream_builder.timeout(timeout);
388 self
389 }
390
391 /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
392 pub fn max_missed_blocks(mut self, n: u64) -> Self {
393 self.stream_builder = self.stream_builder.max_missed_blocks(n);
394 self
395 }
396
397 /// Sets how long a synchronizer may take to process the initial message.
398 ///
399 /// Useful for data-intensive protocols where startup decoding takes longer.
400 pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
401 self.stream_builder = self
402 .stream_builder
403 .startup_timeout(timeout);
404 self
405 }
406
407 /// Configures the stream to exclude state updates.
408 ///
409 /// This reduces bandwidth and decoding workload if protocol state is not of
410 /// interest (e.g. only process new tokens).
411 pub fn no_state(mut self, no_state: bool) -> Self {
412 self.stream_builder = self.stream_builder.no_state(no_state);
413 self
414 }
415
416 /// Sets the API key for authenticating with the Tycho server.
417 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
418 self.stream_builder = self.stream_builder.auth_key(auth_key);
419 self
420 }
421
422 /// Disables TLS/ SSL for the connection, using http and ws protocols.
423 ///
424 /// This is not recommended for production use.
425 pub fn no_tls(mut self, no_tls: bool) -> Self {
426 self.stream_builder = self.stream_builder.no_tls(no_tls);
427 self
428 }
429
430 /// Disable compression for the connection.
431 pub fn disable_compression(mut self) -> Self {
432 self.stream_builder = self
433 .stream_builder
434 .disable_compression();
435 self
436 }
437
438 /// Enables partial block updates (flashblocks).
439 pub fn enable_partial_blocks(mut self) -> Self {
440 self.stream_builder = self
441 .stream_builder
442 .enable_partial_blocks();
443 self
444 }
445
446 /// Exclude specific component IDs from all registered exchanges.
447 pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
448 if !ids.is_empty() {
449 tracing::info!("Blocklisting {} components", ids.len());
450 self.stream_builder = self.stream_builder.blocklisted_ids(ids);
451 }
452 self
453 }
454
455 /// Sets the stream end policy.
456 ///
457 /// Controls when the stream should stop based on synchronizer states.
458 ///
459 /// ## Note
460 /// The stream always ends latest if all protocols are stale or ended independent of
461 /// this configuration. This allows you to end the stream earlier than that.
462 ///
463 /// See [self::StreamEndPolicy] for possible configuration options.
464 pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
465 self.stream_end_policy = stream_end_policy;
466 self
467 }
468
469 /// Provides token metadata used to decode startup snapshots and initialize protocol states.
470 ///
471 /// This is not a stream filter — components arriving after startup include their own token
472 /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
473 /// arriving via stream deltas are added automatically if they meet the quality threshold.
474 pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
475 self.decoder.set_tokens(tokens).await;
476 self
477 }
478
479 /// Skips decoding errors for component state updates.
480 ///
481 /// Allows the stream to continue processing even if some states fail to decode,
482 /// logging a warning instead of panicking.
483 pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
484 self.decoder
485 .skip_state_decode_failures(skip);
486 self
487 }
488
489 /// Sets the minimum token quality for tokens added via the stream.
490 ///
491 /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
492 /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
493 /// apply consistent filtering.
494 pub fn min_token_quality(mut self, quality: u32) -> Self {
495 self.decoder.min_token_quality(quality);
496 self
497 }
498
499 /// Configures the retry policy for websocket reconnects.
500 pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
501 self.stream_builder = self
502 .stream_builder
503 .websockets_retry_config(config);
504 self
505 }
506
507 /// Configures the retry policy for state synchronization.
508 pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
509 self.stream_builder = self
510 .stream_builder
511 .state_synchronizer_retry_config(config);
512 self
513 }
514
515 pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
516 &self.decoder
517 }
518
519 /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
520 ///
521 /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
522 /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
523 /// the confirmed stream and the pending processor.
524 ///
525 /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
526 /// `update_engine()` and cannot be simulated natively.
527 pub fn with_pending_indexer(
528 mut self,
529 extractor: &str,
530 indexer: Box<dyn TxDeltaIndexer>,
531 ) -> Result<Self, StreamError> {
532 if extractor.starts_with("vm:") {
533 return Err(StreamError::SetUpError(format!(
534 "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
535 )));
536 }
537 self.pending_indexers
538 .insert(extractor.to_string(), indexer);
539 Ok(self)
540 }
541
542 /// Enables controlled-step mode for testing.
543 ///
544 /// Returns a [`BlockStepController`] that lets the caller decide when each buffered block
545 /// is released for decoding. Call this before [`build`](Self::build) or
546 /// [`build_with_pending`](Self::build_with_pending) — both detect and wire up the gating
547 /// automatically.
548 ///
549 /// In production code, do not call this method; the stream runs at full speed.
550 pub fn with_step_controller(mut self) -> (Self, BlockStepController) {
551 let (trigger_tx, trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
552 let (peek_tx, peek_rx) =
553 tokio::sync::watch::channel::<Option<FeedMessage<BlockHeader>>>(None);
554
555 self.step_peek_tx = Some(peek_tx);
556 self.step_trigger_rx = Some(trigger_rx);
557
558 let controller = BlockStepController { trigger_tx, peek_rx };
559 (self, controller)
560 }
561
562 /// Spawns a background task that gates `FeedMessage` delivery.
563 ///
564 /// The task buffers each incoming message, publishes it to `peek_tx` so the
565 /// [`BlockStepController`] can inspect it, waits for a trigger, then forwards the message to
566 /// `output_tx` for the decode pipeline. If `advance_tx` is `Some`, a clone of the message is
567 /// also forwarded there (used by the pending-processor path) before the decode step.
568 /// When the input channel closes or a terminal error is received according to
569 /// `stream_end_policy`, the task exits and all output channels are dropped.
570 fn run_gating_task(
571 raw_rx: tokio::sync::mpsc::Receiver<
572 Result<FeedMessage<BlockHeader>, BlockSynchronizerError>,
573 >,
574 mut trigger_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
575 peek_tx: tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>,
576 output_tx: tokio::sync::mpsc::Sender<FeedMessage<BlockHeader>>,
577 stream_end_policy: StreamEndPolicy,
578 ) {
579 tokio::spawn(async move {
580 let mut raw_stream = ReceiverStream::new(raw_rx);
581 loop {
582 let msg = match raw_stream.next().await {
583 Some(Ok(msg)) => msg,
584 Some(Err(e)) => {
585 error!("Block stream ended with terminal error: {e}");
586 break;
587 }
588 None => break,
589 };
590
591 if stream_end_policy.should_end(msg.sync_states.values()) {
592 error!(
593 "Block stream ended due to {:?}: {:?}",
594 stream_end_policy, msg.sync_states
595 );
596 break;
597 }
598
599 // Publish the buffered message so the caller can peek before triggering.
600 let _ = peek_tx.send(Some(msg.clone()));
601
602 // Block until the controller fires trigger_next_block(), or until it is dropped.
603 if trigger_rx.recv().await.is_none() {
604 // Controller dropped — forward the buffered message and drain the rest
605 // without gating, so the stream continues to its natural end.
606 let _ = peek_tx.send(None);
607 if output_tx.send(msg).await.is_err() {
608 break;
609 }
610 while let Some(item) = raw_stream.next().await {
611 let Ok(msg) = item else { break };
612 if stream_end_policy.should_end(msg.sync_states.values()) {
613 break;
614 }
615 if output_tx.send(msg).await.is_err() {
616 break;
617 }
618 }
619 break;
620 }
621
622 // Clear the peek slot before decoding so callers see None between blocks.
623 let _ = peek_tx.send(None);
624
625 if output_tx.send(msg).await.is_err() {
626 break;
627 }
628 }
629 });
630 }
631
632 /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
633 /// in sync with it automatically.
634 ///
635 /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
636 /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
637 /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
638 /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
639 /// nothing for single-threaded use).
640 ///
641 /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
642 /// simulate a candidate bundle; it drains the channel automatically before computing.
643 pub async fn build_with_pending(
644 self,
645 ) -> Result<
646 (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
647 StreamError,
648 > {
649 initialize_hook_handlers().map_err(|e| {
650 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
651 })?;
652 let (_, rx) = self.stream_builder.build().await?;
653 let decoder = Arc::new(self.decoder);
654
655 let (advance_tx, advance_rx) =
656 tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
657 let pending = PendingBlockProcessor::new(
658 self.pending_indexers,
659 decoder.clone(),
660 self.chain,
661 advance_rx,
662 );
663
664 let chain = self.chain;
665 let stream_end_policy = self.stream_end_policy;
666
667 let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
668 if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
669 let (gated_tx, gated_rx) =
670 tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
671 Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
672 Box::new(ReceiverStream::new(gated_rx))
673 } else {
674 let normal = ReceiverStream::new(rx)
675 .take_while(move |msg| match msg {
676 Ok(msg) => {
677 let states = msg.sync_states.values();
678 if stream_end_policy.should_end(states) {
679 error!(
680 "Block stream ended due to {:?}: {:?}",
681 stream_end_policy, msg.sync_states
682 );
683 futures::future::ready(false)
684 } else {
685 futures::future::ready(true)
686 }
687 }
688 Err(e) => {
689 error!("Block stream ended with terminal error: {e}");
690 futures::future::ready(false)
691 }
692 })
693 .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
694 Box::new(Box::pin(normal))
695 };
696
697 let stream = Box::pin(decode_stream.then({
698 let decoder = decoder.clone();
699 move |msg| {
700 let decoder = decoder.clone();
701 let advance_tx = advance_tx.clone();
702 async move {
703 let _ = advance_tx.send(msg.clone());
704 decoder.decode(&msg).await.map_err(|e| {
705 debug!(msg=?msg, "Decode error: {}", e);
706 e
707 })
708 }
709 }
710 }));
711 let stream = inject_native_wrapper(stream, chain);
712 Ok((stream, pending))
713 }
714
715 /// Builds and returns the configured protocol stream.
716 ///
717 /// See the module-level docs for details on stream behavior and emitted messages.
718 /// This method applies all builder settings and starts the stream.
719 pub async fn build(
720 self,
721 ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
722 initialize_hook_handlers().map_err(|e| {
723 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
724 })?;
725 let (_, rx) = self.stream_builder.build().await?;
726 let decoder = Arc::new(self.decoder);
727 let chain = self.chain;
728 let stream_end_policy = self.stream_end_policy;
729
730 let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
731 if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
732 let (gated_tx, gated_rx) =
733 tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
734 Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
735 Box::new(ReceiverStream::new(gated_rx))
736 } else {
737 let normal = ReceiverStream::new(rx)
738 .take_while(move |msg| match msg {
739 Ok(msg) => {
740 let states = msg.sync_states.values();
741 if stream_end_policy.should_end(states) {
742 error!(
743 "Block stream ended due to {:?}: {:?}",
744 stream_end_policy, msg.sync_states
745 );
746 futures::future::ready(false)
747 } else {
748 futures::future::ready(true)
749 }
750 }
751 Err(e) => {
752 error!("Block stream ended with terminal error: {e}");
753 futures::future::ready(false)
754 }
755 })
756 .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
757 Box::new(Box::pin(normal))
758 };
759
760 let stream = Box::pin(decode_stream.then({
761 let decoder = decoder.clone();
762 move |msg| {
763 let decoder = decoder.clone();
764 async move {
765 decoder.decode(&msg).await.map_err(|e| {
766 debug!(msg=?msg, "Decode error: {}", e);
767 e
768 })
769 }
770 }
771 }));
772 let stream = inject_native_wrapper(stream, chain);
773 Ok(stream)
774 }
775}
776
777/// Wraps a decoded protocol stream to inject a `NativeWrapperState` component
778/// on the first successful update.
779///
780/// Skips injection for chains where the native and wrapped-native tokens share
781/// the same address (e.g. Starknet).
782fn inject_native_wrapper(
783 inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
784 chain: Chain,
785) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
786 let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
787 if !has_distinct_wrapper {
788 return Either::Left(inner);
789 }
790
791 Either::Right(
792 stream::once(async move {
793 let mut inner = inner;
794 let first = inner.next().await;
795 let modified = first.into_iter().map(move |result| {
796 result.map(|mut update| {
797 let component = NativeWrapperState::component(chain);
798 let id = component.id.to_string();
799 update
800 .new_pairs
801 .insert(id.clone(), component);
802 update
803 .states
804 .insert(id, Box::new(NativeWrapperState::new(chain)));
805 debug!("Injected native_wrapper component for {chain}");
806 update
807 })
808 });
809 stream::iter(modified).chain(inner)
810 })
811 .flatten(),
812 )
813}
814
815#[cfg(test)]
816mod tests {
817 use std::collections::HashMap;
818
819 use futures::{stream, StreamExt};
820 use tycho_common::models::Chain;
821
822 use super::*;
823 use crate::protocol::models::Update;
824
825 fn empty_update(block: u64) -> Update {
826 Update::new(block, HashMap::new(), HashMap::new())
827 }
828
829 #[tokio::test]
830 async fn test_inject_native_wrapper_first_message_only() {
831 let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
832 let input = stream::iter(updates);
833
834 let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
835 .collect()
836 .await;
837
838 assert_eq!(results.len(), 3);
839
840 let expected_id = NativeWrapperState::component(Chain::Ethereum)
841 .id
842 .to_string();
843
844 let first = results[0]
845 .as_ref()
846 .expect("first update ok");
847 assert!(
848 first
849 .new_pairs
850 .contains_key(&expected_id),
851 "first message should have native_wrapper component"
852 );
853 assert!(
854 first.states.contains_key(&expected_id),
855 "first message should have native_wrapper state"
856 );
857
858 let second = results[1]
859 .as_ref()
860 .expect("second update ok");
861 assert!(
862 !second
863 .new_pairs
864 .contains_key(&expected_id),
865 "second message should NOT have native_wrapper component"
866 );
867 assert!(
868 !second.states.contains_key(&expected_id),
869 "second message should NOT have native_wrapper state"
870 );
871 }
872
873 /// Verifies that `with_step_controller` returns both a modified builder and a controller.
874 ///
875 /// This test only checks that the builder method is callable and that the returned controller
876 /// compiles — it does not start any network connection.
877 #[tokio::test]
878 async fn test_with_step_controller_returns_controller() {
879 let builder = ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum);
880 let (_builder, controller) = builder.with_step_controller();
881 // The controller was successfully returned — verifying the public API is callable.
882 drop(controller);
883 }
884
885 /// Connects to a live Tycho instance, verifies that the stream blocks until
886 /// `trigger_next_block` is called, and that `peek_next_block` exposes the buffered message.
887 #[ignore = "requires live Tycho connection (TYCHO_AUTH_TOKEN env var)"]
888 #[tokio::test]
889 async fn test_step_controller_trigger_releases_block() {
890 use std::{env, time::Duration};
891
892 use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
893
894 let auth = env::var("TYCHO_AUTH_TOKEN").expect("TYCHO_AUTH_TOKEN must be set");
895
896 // Track a single well-known pool to minimise startup latency.
897 let usdc_weth_v2 = "0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc".to_string();
898 let (builder, controller) =
899 ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
900 .auth_key(Some(auth))
901 .exchange::<UniswapV2State>(
902 "uniswap_v2",
903 ComponentFilter::Ids(vec![usdc_weth_v2]),
904 None,
905 )
906 .with_step_controller();
907
908 let (stream, _pending) = builder
909 .build_with_pending()
910 .await
911 .expect("build_with_pending failed");
912 tokio::pin!(stream);
913
914 // Wait up to 60 s for the first block to arrive in the gating buffer.
915 let peeked = tokio::time::timeout(Duration::from_secs(60), controller.peek_next_block())
916 .await
917 .expect("timed out waiting for first block to buffer")
918 .expect("stream ended before a block arrived");
919
920 assert!(!peeked.sync_states.is_empty(), "peeked block should carry sync states");
921
922 // Stream must be empty before we trigger — the gating task should be holding the block.
923 let pre_trigger = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
924 assert!(
925 pre_trigger.is_err(),
926 "stream should be blocked before trigger_next_block, got an item"
927 );
928
929 // Release the block.
930 controller
931 .trigger_next_block()
932 .expect("trigger_next_block failed");
933
934 // Stream should now yield the decoded update within one block time.
935 let update = tokio::time::timeout(Duration::from_secs(30), stream.next())
936 .await
937 .expect("timed out waiting for update after trigger")
938 .expect("stream ended unexpectedly");
939
940 assert!(update.is_ok(), "decoded update should be Ok, got: {:?}", update);
941 }
942}