Skip to main content

tycho_simulation/evm/
stream.rs

1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//!   set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//! use std::collections::HashSet;
70//!
71//! #[tokio::main]
72//! async fn main() {
73//!     let all_tokens = load_all_tokens(
74//!         "tycho-beta.propellerheads.xyz",
75//!         false,
76//!         Some("sampletoken"),
77//!         true,
78//!         Chain::Ethereum,
79//!         None,
80//!         None,
81//!     )
82//!     .await
83//!     .expect("Failed loading tokens");
84//!
85//!     let protocol_stream =
86//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
87//!             .auth_key(Some("sampletoken".to_string()))
88//!             .skip_state_decode_failures(true)
89//!             .exchange::<UniswapV2State>(
90//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
91//!             )
92//!             .blocklist_components(HashSet::new())
93//!             .set_tokens(all_tokens)
94//!             .await
95//!             .build()
96//!             .await
97//!             .expect("Failed building protocol stream");
98//!     tokio::pin!(protocol_stream);
99//!
100//!     // Loop through block updates
101//!     while let Some(msg) = protocol_stream.next().await {
102//!         dbg!(msg).expect("failed decoding");
103//!     }
104//! }
105//! ```
106use std::{
107    collections::{HashMap, HashSet},
108    sync::Arc,
109    time,
110};
111
112use futures::{future::Either, stream, Stream, StreamExt};
113use tokio_stream::wrappers::ReceiverStream;
114use tracing::{debug, error, warn};
115use tycho_client::{
116    feed::{
117        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
118        BlockSynchronizerError, FeedMessage, SynchronizerState,
119    },
120    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
121};
122use tycho_common::{
123    models::{token::Token, Chain},
124    simulation::protocol_sim::ProtocolSim,
125    traits::TxDeltaIndexer,
126    Bytes,
127};
128
129use crate::{
130    evm::{
131        decoder::{StreamDecodeError, TychoStreamDecoder},
132        pending::PendingBlockProcessor,
133        protocol::{
134            native_wrapper::state::NativeWrapperState,
135            uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
136        },
137    },
138    protocol::{
139        errors::InvalidSnapshotError,
140        models::{DecoderContext, TryFromWithBlock, Update},
141    },
142};
143
144const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
145
146#[derive(Default, Debug, Clone, Copy)]
147pub enum StreamEndPolicy {
148    /// End stream if all states are Stale or Ended (default)
149    #[default]
150    AllEndedOrStale,
151    /// End stream if any protocol ended
152    AnyEnded,
153    /// End stream if any protocol ended or is stale
154    AnyEndedOrStale,
155    /// End stream if any protocol is stale
156    AnyStale,
157}
158
159impl StreamEndPolicy {
160    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
161        let mut it = states.into_iter();
162        match self {
163            StreamEndPolicy::AllEndedOrStale => false,
164            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
165            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
166            StreamEndPolicy::AnyEndedOrStale => {
167                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
168            }
169        }
170    }
171}
172
173/// Handle returned by [`ProtocolStreamBuilder::with_step_controller`] that gives external
174/// control over when each buffered block is released for decoding.
175///
176/// Intended for complex test scenarios where the caller needs to observe what the next
177/// block contains before allowing the decoder pipeline to process it.
178///
179/// ## Drop behaviour
180///
181/// Dropping this controller ungates the stream: the gating task detects the closed trigger
182/// channel, forwards the currently-buffered block (if any), then continues passing subsequent
183/// blocks through without waiting for triggers — exactly as if step-control had never been
184/// enabled. The stream runs to its natural end.
185pub struct BlockStepController {
186    /// Sends a trigger signal to release the next buffered block.
187    trigger_tx: tokio::sync::mpsc::UnboundedSender<()>,
188    /// Watch channel containing the next buffered raw message, or `None` if no block is pending.
189    peek_rx: tokio::sync::watch::Receiver<Option<FeedMessage<BlockHeader>>>,
190}
191
192impl BlockStepController {
193    /// Releases the next buffered block for decoding and emission.
194    ///
195    /// Returns an error if the stream has already ended and the sender is disconnected.
196    pub fn trigger_next_block(&self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
197        // Send a unit value on the trigger channel to unblock the gating task.
198        self.trigger_tx.send(())
199    }
200
201    /// Returns the currently buffered block immediately, or `None` if no block is buffered yet.
202    pub fn try_peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
203        self.peek_rx.borrow().clone()
204    }
205
206    /// Waits until a block is buffered and returns it without consuming it.
207    ///
208    /// Returns `None` only if the stream has ended and no further blocks will arrive.
209    /// If a block is already buffered when this is called, it returns immediately.
210    pub async fn peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
211        // Clone so we don't hold a mutable borrow on self; wait_for checks the current
212        // value first, so this returns immediately if a block is already present.
213        let mut rx = self.peek_rx.clone();
214        let guard = rx
215            .wait_for(|v| v.is_some())
216            .await
217            .ok()?;
218        guard.clone()
219    }
220}
221
222/// Builds and configures the multi protocol stream described in the [module-level docs](self).
223///
224/// See the module documentation for details on protocols, configuration options, and
225/// stream behavior.
226pub struct ProtocolStreamBuilder {
227    decoder: TychoStreamDecoder<BlockHeader>,
228    stream_builder: TychoStreamBuilder,
229    stream_end_policy: StreamEndPolicy,
230    chain: Chain,
231    pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
232    /// Watch sender used to publish the currently-buffered raw block so the controller can peek
233    /// at it before triggering. `Some` iff step-control mode is active.
234    step_peek_tx: Option<tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>>,
235    /// Receiver half of the trigger channel. Held here until `build()` / `build_with_pending()`
236    /// transfers ownership to the gating task. `Some` iff step-control mode is active.
237    step_trigger_rx: Option<tokio::sync::mpsc::UnboundedReceiver<()>>,
238}
239
240impl ProtocolStreamBuilder {
241    /// Creates a new builder for a multi-protocol stream.
242    ///
243    /// See the [module-level docs](self) for full details on stream behavior and configuration.
244    pub fn new(tycho_url: &str, chain: Chain) -> Self {
245        Self {
246            decoder: TychoStreamDecoder::new(),
247            stream_builder: TychoStreamBuilder::new(tycho_url, chain),
248            stream_end_policy: StreamEndPolicy::default(),
249            chain,
250            pending_indexers: HashMap::new(),
251            step_peek_tx: None,
252            step_trigger_rx: None,
253        }
254    }
255
256    /// Adds a specific exchange to the stream.
257    ///
258    /// This configures the builder to include a new protocol synchronizer for `name`,
259    /// filtering its components according to `filter` and optionally `filter_fn`.
260    ///
261    /// The type parameter `T` specifies the decoder type for this exchange. All
262    /// component states for this exchange will be decoded into instances of `T`.
263    ///
264    /// # Parameters
265    ///
266    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
267    /// - `filter`: Defines the set of components to include in the stream.
268    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
269    ///   expressible in `filter`.
270    ///
271    /// # Notes
272    ///
273    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
274    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
275    /// filter function is required to ensure correct decoding and quoting logic.
276    pub fn exchange<T>(
277        mut self,
278        name: &str,
279        filter: ComponentFilter,
280        filter_fn: Option<fn(&ComponentWithState) -> bool>,
281    ) -> Self
282    where
283        T: ProtocolSim
284            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
285            + Send
286            + 'static,
287    {
288        self.stream_builder = self
289            .stream_builder
290            .exchange(name, filter);
291        self.decoder.register_decoder::<T>(name);
292        if let Some(predicate) = filter_fn {
293            self.decoder
294                .register_filter(name, predicate);
295        }
296
297        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
298            warn!(
299                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
300                name
301            );
302        }
303
304        self
305    }
306
307    /// Adds a specific exchange to the stream with decoder context.
308    ///
309    /// This configures the builder to include a new protocol synchronizer for `name`,
310    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
311    /// the DecoderContext (this is useful to test protocols that are not live yet)
312    ///
313    /// The type parameter `T` specifies the decoder type for this exchange. All
314    /// component states for this exchange will be decoded into instances of `T`.
315    ///
316    /// # Parameters
317    ///
318    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
319    /// - `filter`: Defines the set of components to include in the stream.
320    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
321    ///   expressible in `filter`.
322    /// - `decoder_context`: The decoder context for this exchange
323    ///
324    /// # Notes
325    ///
326    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
327    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
328    /// filter function is required to ensure correct decoding and quoting logic.
329    pub fn exchange_with_decoder_context<T>(
330        mut self,
331        name: &str,
332        filter: ComponentFilter,
333        filter_fn: Option<fn(&ComponentWithState) -> bool>,
334        decoder_context: DecoderContext,
335    ) -> Self
336    where
337        T: ProtocolSim
338            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
339            + Send
340            + 'static,
341    {
342        self.stream_builder = self
343            .stream_builder
344            .exchange(name, filter);
345        self.decoder
346            .register_decoder_with_context::<T>(name, decoder_context);
347        if let Some(predicate) = filter_fn {
348            self.decoder
349                .register_filter(name, predicate);
350        }
351
352        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
353            warn!(
354                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
355                name
356            );
357        }
358
359        self
360    }
361
362    /// Sets the block time interval for the stream.
363    ///
364    /// This controls how often the stream produces updates.
365    pub fn block_time(mut self, block_time: u64) -> Self {
366        self.stream_builder = self
367            .stream_builder
368            .block_time(block_time);
369        self
370    }
371
372    /// Sets the network operation timeout (deprecated).
373    ///
374    /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
375    /// This method is retained for backwards compatibility.
376    #[deprecated = "Use latency_buffer instead"]
377    pub fn timeout(mut self, timeout: u64) -> Self {
378        self.stream_builder = self.stream_builder.timeout(timeout);
379        self
380    }
381
382    /// Sets the latency buffer to aggregate same-block messages.
383    ///
384    /// This allows the supervisor to wait a short interval for all synchronizers to emit
385    /// before aggregating.
386    pub fn latency_buffer(mut self, timeout: u64) -> Self {
387        self.stream_builder = self.stream_builder.timeout(timeout);
388        self
389    }
390
391    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
392    pub fn max_missed_blocks(mut self, n: u64) -> Self {
393        self.stream_builder = self.stream_builder.max_missed_blocks(n);
394        self
395    }
396
397    /// Sets how long a synchronizer may take to process the initial message.
398    ///
399    /// Useful for data-intensive protocols where startup decoding takes longer.
400    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
401        self.stream_builder = self
402            .stream_builder
403            .startup_timeout(timeout);
404        self
405    }
406
407    /// Configures the stream to exclude state updates.
408    ///
409    /// This reduces bandwidth and decoding workload if protocol state is not of
410    /// interest (e.g. only process new tokens).
411    pub fn no_state(mut self, no_state: bool) -> Self {
412        self.stream_builder = self.stream_builder.no_state(no_state);
413        self
414    }
415
416    /// Sets the API key for authenticating with the Tycho server.
417    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
418        self.stream_builder = self.stream_builder.auth_key(auth_key);
419        self
420    }
421
422    /// Disables TLS/ SSL for the connection, using http and ws protocols.
423    ///
424    /// This is not recommended for production use.
425    pub fn no_tls(mut self, no_tls: bool) -> Self {
426        self.stream_builder = self.stream_builder.no_tls(no_tls);
427        self
428    }
429
430    /// Disable compression for the connection.
431    pub fn disable_compression(mut self) -> Self {
432        self.stream_builder = self
433            .stream_builder
434            .disable_compression();
435        self
436    }
437
438    /// Enables partial block updates (flashblocks).
439    pub fn enable_partial_blocks(mut self) -> Self {
440        self.stream_builder = self
441            .stream_builder
442            .enable_partial_blocks();
443        self
444    }
445
446    /// Exclude specific component IDs from all registered exchanges.
447    pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
448        if !ids.is_empty() {
449            tracing::info!("Blocklisting {} components", ids.len());
450            self.stream_builder = self.stream_builder.blocklisted_ids(ids);
451        }
452        self
453    }
454
455    /// Sets the stream end policy.
456    ///
457    /// Controls when the stream should stop based on synchronizer states.
458    ///
459    /// ## Note
460    /// The stream always ends latest if all protocols are stale or ended independent of
461    /// this configuration. This allows you to end the stream earlier than that.
462    ///
463    /// See [self::StreamEndPolicy] for possible configuration options.
464    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
465        self.stream_end_policy = stream_end_policy;
466        self
467    }
468
469    /// Provides token metadata used to decode startup snapshots and initialize protocol states.
470    ///
471    /// This is not a stream filter — components arriving after startup include their own token
472    /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
473    /// arriving via stream deltas are added automatically if they meet the quality threshold.
474    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
475        self.decoder.set_tokens(tokens).await;
476        self
477    }
478
479    /// Skips decoding errors for component state updates.
480    ///
481    /// Allows the stream to continue processing even if some states fail to decode,
482    /// logging a warning instead of panicking.
483    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
484        self.decoder
485            .skip_state_decode_failures(skip);
486        self
487    }
488
489    /// Sets the minimum token quality for tokens added via the stream.
490    ///
491    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
492    /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
493    /// apply consistent filtering.
494    pub fn min_token_quality(mut self, quality: u32) -> Self {
495        self.decoder.min_token_quality(quality);
496        self
497    }
498
499    /// Configures the retry policy for websocket reconnects.
500    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
501        self.stream_builder = self
502            .stream_builder
503            .websockets_retry_config(config);
504        self
505    }
506
507    /// Configures the retry policy for state synchronization.
508    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
509        self.stream_builder = self
510            .stream_builder
511            .state_synchronizer_retry_config(config);
512        self
513    }
514
515    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
516        &self.decoder
517    }
518
519    /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
520    ///
521    /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
522    /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
523    /// the confirmed stream and the pending processor.
524    ///
525    /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
526    /// `update_engine()` and cannot be simulated natively.
527    pub fn with_pending_indexer(
528        mut self,
529        extractor: &str,
530        indexer: Box<dyn TxDeltaIndexer>,
531    ) -> Result<Self, StreamError> {
532        if extractor.starts_with("vm:") {
533            return Err(StreamError::SetUpError(format!(
534                "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
535            )));
536        }
537        self.pending_indexers
538            .insert(extractor.to_string(), indexer);
539        Ok(self)
540    }
541
542    /// Enables controlled-step mode for testing.
543    ///
544    /// Returns a [`BlockStepController`] that lets the caller decide when each buffered block
545    /// is released for decoding. Call this before [`build`](Self::build) or
546    /// [`build_with_pending`](Self::build_with_pending) — both detect and wire up the gating
547    /// automatically.
548    ///
549    /// In production code, do not call this method; the stream runs at full speed.
550    pub fn with_step_controller(mut self) -> (Self, BlockStepController) {
551        let (trigger_tx, trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
552        let (peek_tx, peek_rx) =
553            tokio::sync::watch::channel::<Option<FeedMessage<BlockHeader>>>(None);
554
555        self.step_peek_tx = Some(peek_tx);
556        self.step_trigger_rx = Some(trigger_rx);
557
558        let controller = BlockStepController { trigger_tx, peek_rx };
559        (self, controller)
560    }
561
562    /// Spawns a background task that gates `FeedMessage` delivery.
563    ///
564    /// The task buffers each incoming message, publishes it to `peek_tx` so the
565    /// [`BlockStepController`] can inspect it, waits for a trigger, then forwards the message to
566    /// `output_tx` for the decode pipeline. If `advance_tx` is `Some`, a clone of the message is
567    /// also forwarded there (used by the pending-processor path) before the decode step.
568    /// When the input channel closes or a terminal error is received according to
569    /// `stream_end_policy`, the task exits and all output channels are dropped.
570    fn run_gating_task(
571        raw_rx: tokio::sync::mpsc::Receiver<
572            Result<FeedMessage<BlockHeader>, BlockSynchronizerError>,
573        >,
574        mut trigger_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
575        peek_tx: tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>,
576        output_tx: tokio::sync::mpsc::Sender<FeedMessage<BlockHeader>>,
577        stream_end_policy: StreamEndPolicy,
578    ) {
579        tokio::spawn(async move {
580            let mut raw_stream = ReceiverStream::new(raw_rx);
581            loop {
582                let msg = match raw_stream.next().await {
583                    Some(Ok(msg)) => msg,
584                    Some(Err(e)) => {
585                        error!("Block stream ended with terminal error: {e}");
586                        break;
587                    }
588                    None => break,
589                };
590
591                if stream_end_policy.should_end(msg.sync_states.values()) {
592                    error!(
593                        "Block stream ended due to {:?}: {:?}",
594                        stream_end_policy, msg.sync_states
595                    );
596                    break;
597                }
598
599                // Publish the buffered message so the caller can peek before triggering.
600                let _ = peek_tx.send(Some(msg.clone()));
601
602                // Block until the controller fires trigger_next_block(), or until it is dropped.
603                if trigger_rx.recv().await.is_none() {
604                    // Controller dropped — forward the buffered message and drain the rest
605                    // without gating, so the stream continues to its natural end.
606                    let _ = peek_tx.send(None);
607                    if output_tx.send(msg).await.is_err() {
608                        break;
609                    }
610                    while let Some(item) = raw_stream.next().await {
611                        let Ok(msg) = item else { break };
612                        if stream_end_policy.should_end(msg.sync_states.values()) {
613                            break;
614                        }
615                        if output_tx.send(msg).await.is_err() {
616                            break;
617                        }
618                    }
619                    break;
620                }
621
622                // Clear the peek slot before decoding so callers see None between blocks.
623                let _ = peek_tx.send(None);
624
625                if output_tx.send(msg).await.is_err() {
626                    break;
627                }
628            }
629        });
630    }
631
632    /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
633    /// in sync with it automatically.
634    ///
635    /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
636    /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
637    /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
638    /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
639    /// nothing for single-threaded use).
640    ///
641    /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
642    /// simulate a candidate bundle; it drains the channel automatically before computing.
643    pub async fn build_with_pending(
644        self,
645    ) -> Result<
646        (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
647        StreamError,
648    > {
649        initialize_hook_handlers().map_err(|e| {
650            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
651        })?;
652        let (_, rx) = self.stream_builder.build().await?;
653        let decoder = Arc::new(self.decoder);
654
655        let (advance_tx, advance_rx) =
656            tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
657        let pending = PendingBlockProcessor::new(
658            self.pending_indexers,
659            decoder.clone(),
660            self.chain,
661            advance_rx,
662        );
663
664        let chain = self.chain;
665        let stream_end_policy = self.stream_end_policy;
666
667        let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
668            if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
669                let (gated_tx, gated_rx) =
670                    tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
671                Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
672                Box::new(ReceiverStream::new(gated_rx))
673            } else {
674                let normal = ReceiverStream::new(rx)
675                    .take_while(move |msg| match msg {
676                        Ok(msg) => {
677                            let states = msg.sync_states.values();
678                            if stream_end_policy.should_end(states) {
679                                error!(
680                                    "Block stream ended due to {:?}: {:?}",
681                                    stream_end_policy, msg.sync_states
682                                );
683                                futures::future::ready(false)
684                            } else {
685                                futures::future::ready(true)
686                            }
687                        }
688                        Err(e) => {
689                            error!("Block stream ended with terminal error: {e}");
690                            futures::future::ready(false)
691                        }
692                    })
693                    .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
694                Box::new(Box::pin(normal))
695            };
696
697        let stream = Box::pin(decode_stream.then({
698            let decoder = decoder.clone();
699            move |msg| {
700                let decoder = decoder.clone();
701                let advance_tx = advance_tx.clone();
702                async move {
703                    let _ = advance_tx.send(msg.clone());
704                    decoder.decode(&msg).await.map_err(|e| {
705                        debug!(msg=?msg, "Decode error: {}", e);
706                        e
707                    })
708                }
709            }
710        }));
711        let stream = inject_native_wrapper(stream, chain);
712        Ok((stream, pending))
713    }
714
715    /// Builds and returns the configured protocol stream.
716    ///
717    /// See the module-level docs for details on stream behavior and emitted messages.
718    /// This method applies all builder settings and starts the stream.
719    pub async fn build(
720        self,
721    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
722        initialize_hook_handlers().map_err(|e| {
723            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
724        })?;
725        let (_, rx) = self.stream_builder.build().await?;
726        let decoder = Arc::new(self.decoder);
727        let chain = self.chain;
728        let stream_end_policy = self.stream_end_policy;
729
730        let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
731            if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
732                let (gated_tx, gated_rx) =
733                    tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
734                Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
735                Box::new(ReceiverStream::new(gated_rx))
736            } else {
737                let normal = ReceiverStream::new(rx)
738                    .take_while(move |msg| match msg {
739                        Ok(msg) => {
740                            let states = msg.sync_states.values();
741                            if stream_end_policy.should_end(states) {
742                                error!(
743                                    "Block stream ended due to {:?}: {:?}",
744                                    stream_end_policy, msg.sync_states
745                                );
746                                futures::future::ready(false)
747                            } else {
748                                futures::future::ready(true)
749                            }
750                        }
751                        Err(e) => {
752                            error!("Block stream ended with terminal error: {e}");
753                            futures::future::ready(false)
754                        }
755                    })
756                    .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
757                Box::new(Box::pin(normal))
758            };
759
760        let stream = Box::pin(decode_stream.then({
761            let decoder = decoder.clone();
762            move |msg| {
763                let decoder = decoder.clone();
764                async move {
765                    decoder.decode(&msg).await.map_err(|e| {
766                        debug!(msg=?msg, "Decode error: {}", e);
767                        e
768                    })
769                }
770            }
771        }));
772        let stream = inject_native_wrapper(stream, chain);
773        Ok(stream)
774    }
775}
776
777/// Wraps a decoded protocol stream to inject a `NativeWrapperState` component
778/// on the first successful update.
779///
780/// Skips injection for chains where the native and wrapped-native tokens share
781/// the same address (e.g. Starknet).
782fn inject_native_wrapper(
783    inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
784    chain: Chain,
785) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
786    let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
787    if !has_distinct_wrapper {
788        return Either::Left(inner);
789    }
790
791    Either::Right(
792        stream::once(async move {
793            let mut inner = inner;
794            let first = inner.next().await;
795            let modified = first.into_iter().map(move |result| {
796                result.map(|mut update| {
797                    let component = NativeWrapperState::component(chain);
798                    let id = component.id.to_string();
799                    update
800                        .new_pairs
801                        .insert(id.clone(), component);
802                    update
803                        .states
804                        .insert(id, Box::new(NativeWrapperState::new(chain)));
805                    debug!("Injected native_wrapper component for {chain}");
806                    update
807                })
808            });
809            stream::iter(modified).chain(inner)
810        })
811        .flatten(),
812    )
813}
814
815#[cfg(test)]
816mod tests {
817    use std::collections::HashMap;
818
819    use futures::{stream, StreamExt};
820    use tycho_common::models::Chain;
821
822    use super::*;
823    use crate::protocol::models::Update;
824
825    fn empty_update(block: u64) -> Update {
826        Update::new(block, HashMap::new(), HashMap::new())
827    }
828
829    #[tokio::test]
830    async fn test_inject_native_wrapper_first_message_only() {
831        let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
832        let input = stream::iter(updates);
833
834        let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
835            .collect()
836            .await;
837
838        assert_eq!(results.len(), 3);
839
840        let expected_id = NativeWrapperState::component(Chain::Ethereum)
841            .id
842            .to_string();
843
844        let first = results[0]
845            .as_ref()
846            .expect("first update ok");
847        assert!(
848            first
849                .new_pairs
850                .contains_key(&expected_id),
851            "first message should have native_wrapper component"
852        );
853        assert!(
854            first.states.contains_key(&expected_id),
855            "first message should have native_wrapper state"
856        );
857
858        let second = results[1]
859            .as_ref()
860            .expect("second update ok");
861        assert!(
862            !second
863                .new_pairs
864                .contains_key(&expected_id),
865            "second message should NOT have native_wrapper component"
866        );
867        assert!(
868            !second.states.contains_key(&expected_id),
869            "second message should NOT have native_wrapper state"
870        );
871    }
872
873    /// Verifies that `with_step_controller` returns both a modified builder and a controller.
874    ///
875    /// This test only checks that the builder method is callable and that the returned controller
876    /// compiles — it does not start any network connection.
877    #[tokio::test]
878    async fn test_with_step_controller_returns_controller() {
879        let builder = ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum);
880        let (_builder, controller) = builder.with_step_controller();
881        // The controller was successfully returned — verifying the public API is callable.
882        drop(controller);
883    }
884
885    /// Connects to a live Tycho instance, verifies that the stream blocks until
886    /// `trigger_next_block` is called, and that `peek_next_block` exposes the buffered message.
887    #[ignore = "requires live Tycho connection (TYCHO_AUTH_TOKEN env var)"]
888    #[tokio::test]
889    async fn test_step_controller_trigger_releases_block() {
890        use std::{env, time::Duration};
891
892        use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
893
894        let auth = env::var("TYCHO_AUTH_TOKEN").expect("TYCHO_AUTH_TOKEN must be set");
895
896        // Track a single well-known pool to minimise startup latency.
897        let usdc_weth_v2 = "0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc".to_string();
898        let (builder, controller) =
899            ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
900                .auth_key(Some(auth))
901                .exchange::<UniswapV2State>(
902                    "uniswap_v2",
903                    ComponentFilter::Ids(vec![usdc_weth_v2]),
904                    None,
905                )
906                .with_step_controller();
907
908        let (stream, _pending) = builder
909            .build_with_pending()
910            .await
911            .expect("build_with_pending failed");
912        tokio::pin!(stream);
913
914        // Wait up to 60 s for the first block to arrive in the gating buffer.
915        let peeked = tokio::time::timeout(Duration::from_secs(60), controller.peek_next_block())
916            .await
917            .expect("timed out waiting for first block to buffer")
918            .expect("stream ended before a block arrived");
919
920        assert!(!peeked.sync_states.is_empty(), "peeked block should carry sync states");
921
922        // Stream must be empty before we trigger — the gating task should be holding the block.
923        let pre_trigger = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
924        assert!(
925            pre_trigger.is_err(),
926            "stream should be blocked before trigger_next_block, got an item"
927        );
928
929        // Release the block.
930        controller
931            .trigger_next_block()
932            .expect("trigger_next_block failed");
933
934        // Stream should now yield the decoded update within one block time.
935        let update = tokio::time::timeout(Duration::from_secs(30), stream.next())
936            .await
937            .expect("timed out waiting for update after trigger")
938            .expect("stream ended unexpectedly");
939
940        assert!(update.is_ok(), "decoded update should be Ok, got: {:?}", update);
941    }
942}