Skip to main content

tycho_simulation/evm/
stream.rs

1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//!   set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//!
70//! #[tokio::main]
71//! async fn main() {
72//!     let all_tokens = load_all_tokens(
73//!         "tycho-beta.propellerheads.xyz",
74//!         false,
75//!         Some("sampletoken"),
76//!         true,
77//!         Chain::Ethereum,
78//!         None,
79//!         None,
80//!     )
81//!     .await
82//!     .expect("Failed loading tokens");
83//!
84//!     let protocol_stream =
85//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
86//!             .auth_key(Some("sampletoken".to_string()))
87//!             .skip_state_decode_failures(true)
88//!             .exchange::<UniswapV2State>(
89//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
90//!             )
91//!             .set_tokens(all_tokens)
92//!             .await
93//!             .build()
94//!             .await
95//!             .expect("Failed building protocol stream");
96//!     tokio::pin!(protocol_stream);
97//!
98//!     // Loop through block updates
99//!     while let Some(msg) = protocol_stream.next().await {
100//!         dbg!(msg).expect("failed decoding");
101//!     }
102//! }
103//! ```
104use std::{
105    collections::{HashMap, HashSet},
106    sync::Arc,
107    time,
108};
109
110use futures::{future::Either, stream, Stream, StreamExt};
111use tokio_stream::wrappers::ReceiverStream;
112use tracing::{debug, error, warn};
113use tycho_client::{
114    feed::{
115        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
116        BlockSynchronizerError, FeedMessage, SynchronizerState,
117    },
118    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
119};
120use tycho_common::{
121    models::{token::Token, Chain},
122    simulation::protocol_sim::ProtocolSim,
123    traits::TxDeltaIndexer,
124    Bytes,
125};
126
127use crate::{
128    evm::{
129        decoder::{StreamDecodeError, TychoStreamDecoder},
130        pending::PendingBlockProcessor,
131        protocol::{
132            native_wrapper::state::NativeWrapperState,
133            uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
134        },
135    },
136    protocol::{
137        errors::InvalidSnapshotError,
138        models::{DecoderContext, TryFromWithBlock, Update},
139    },
140    utils::default_blocklist,
141};
142
143const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
144
145#[derive(Default, Debug, Clone, Copy)]
146pub enum StreamEndPolicy {
147    /// End stream if all states are Stale or Ended (default)
148    #[default]
149    AllEndedOrStale,
150    /// End stream if any protocol ended
151    AnyEnded,
152    /// End stream if any protocol ended or is stale
153    AnyEndedOrStale,
154    /// End stream if any protocol is stale
155    AnyStale,
156}
157
158impl StreamEndPolicy {
159    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
160        let mut it = states.into_iter();
161        match self {
162            StreamEndPolicy::AllEndedOrStale => false,
163            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
164            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
165            StreamEndPolicy::AnyEndedOrStale => {
166                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
167            }
168        }
169    }
170}
171
172/// Handle returned by [`ProtocolStreamBuilder::with_step_controller`] that gives external
173/// control over when each buffered block is released for decoding.
174///
175/// Intended for complex test scenarios where the caller needs to observe what the next
176/// block contains before allowing the decoder pipeline to process it.
177///
178/// ## Drop behaviour
179///
180/// Dropping this controller ungates the stream: the gating task detects the closed trigger
181/// channel, forwards the currently-buffered block (if any), then continues passing subsequent
182/// blocks through without waiting for triggers — exactly as if step-control had never been
183/// enabled. The stream runs to its natural end.
184pub struct BlockStepController {
185    /// Sends a trigger signal to release the next buffered block.
186    trigger_tx: tokio::sync::mpsc::UnboundedSender<()>,
187    /// Watch channel containing the next buffered raw message, or `None` if no block is pending.
188    peek_rx: tokio::sync::watch::Receiver<Option<FeedMessage<BlockHeader>>>,
189}
190
191impl BlockStepController {
192    /// Releases the next buffered block for decoding and emission.
193    ///
194    /// Returns an error if the stream has already ended and the sender is disconnected.
195    pub fn trigger_next_block(&self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
196        // Send a unit value on the trigger channel to unblock the gating task.
197        self.trigger_tx.send(())
198    }
199
200    /// Returns the currently buffered block immediately, or `None` if no block is buffered yet.
201    pub fn try_peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
202        self.peek_rx.borrow().clone()
203    }
204
205    /// Waits until a block is buffered and returns it without consuming it.
206    ///
207    /// Returns `None` only if the stream has ended and no further blocks will arrive.
208    /// If a block is already buffered when this is called, it returns immediately.
209    pub async fn peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
210        // Clone so we don't hold a mutable borrow on self; wait_for checks the current
211        // value first, so this returns immediately if a block is already present.
212        let mut rx = self.peek_rx.clone();
213        let guard = rx
214            .wait_for(|v| v.is_some())
215            .await
216            .ok()?;
217        guard.clone()
218    }
219}
220
221/// Builds and configures the multi protocol stream described in the [module-level docs](self).
222///
223/// See the module documentation for details on protocols, configuration options, and
224/// stream behavior.
225pub struct ProtocolStreamBuilder {
226    decoder: TychoStreamDecoder<BlockHeader>,
227    stream_builder: TychoStreamBuilder,
228    stream_end_policy: StreamEndPolicy,
229    chain: Chain,
230    pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
231    /// Watch sender used to publish the currently-buffered raw block so the controller can peek
232    /// at it before triggering. `Some` iff step-control mode is active.
233    step_peek_tx: Option<tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>>,
234    /// Receiver half of the trigger channel. Held here until `build()` / `build_with_pending()`
235    /// transfers ownership to the gating task. `Some` iff step-control mode is active.
236    step_trigger_rx: Option<tokio::sync::mpsc::UnboundedReceiver<()>>,
237}
238
239impl ProtocolStreamBuilder {
240    /// Creates a new builder for a multi-protocol stream.
241    ///
242    /// The shipped pool blocklist is applied by default, excluding components known to break
243    /// simulation. Use [`blocklist_components`](Self::blocklist_components) to exclude additional
244    /// components.
245    ///
246    /// See the [module-level docs](self) for full details on stream behavior and configuration.
247    pub fn new(tycho_url: &str, chain: Chain) -> Self {
248        Self {
249            decoder: TychoStreamDecoder::new(),
250            stream_builder: TychoStreamBuilder::new(tycho_url, chain)
251                .blocklisted_ids(default_blocklist()),
252            stream_end_policy: StreamEndPolicy::default(),
253            chain,
254            pending_indexers: HashMap::new(),
255            step_peek_tx: None,
256            step_trigger_rx: None,
257        }
258    }
259
260    /// Adds a specific exchange to the stream.
261    ///
262    /// This configures the builder to include a new protocol synchronizer for `name`,
263    /// filtering its components according to `filter` and optionally `filter_fn`.
264    ///
265    /// The type parameter `T` specifies the decoder type for this exchange. All
266    /// component states for this exchange will be decoded into instances of `T`.
267    ///
268    /// # Parameters
269    ///
270    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
271    /// - `filter`: Defines the set of components to include in the stream.
272    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
273    ///   expressible in `filter`.
274    ///
275    /// # Notes
276    ///
277    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
278    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
279    /// filter function is required to ensure correct decoding and quoting logic.
280    pub fn exchange<T>(
281        mut self,
282        name: &str,
283        filter: ComponentFilter,
284        filter_fn: Option<fn(&ComponentWithState) -> bool>,
285    ) -> Self
286    where
287        T: ProtocolSim
288            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
289            + Send
290            + 'static,
291    {
292        self.stream_builder = self
293            .stream_builder
294            .exchange(name, filter);
295        self.decoder.register_decoder::<T>(name);
296        if let Some(predicate) = filter_fn {
297            self.decoder
298                .register_filter(name, predicate);
299        }
300
301        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
302            warn!(
303                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
304                name
305            );
306        }
307
308        self
309    }
310
311    /// Adds a specific exchange to the stream with decoder context.
312    ///
313    /// This configures the builder to include a new protocol synchronizer for `name`,
314    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
315    /// the DecoderContext (this is useful to test protocols that are not live yet)
316    ///
317    /// The type parameter `T` specifies the decoder type for this exchange. All
318    /// component states for this exchange will be decoded into instances of `T`.
319    ///
320    /// # Parameters
321    ///
322    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
323    /// - `filter`: Defines the set of components to include in the stream.
324    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
325    ///   expressible in `filter`.
326    /// - `decoder_context`: The decoder context for this exchange
327    ///
328    /// # Notes
329    ///
330    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
331    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
332    /// filter function is required to ensure correct decoding and quoting logic.
333    pub fn exchange_with_decoder_context<T>(
334        mut self,
335        name: &str,
336        filter: ComponentFilter,
337        filter_fn: Option<fn(&ComponentWithState) -> bool>,
338        decoder_context: DecoderContext,
339    ) -> Self
340    where
341        T: ProtocolSim
342            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
343            + Send
344            + 'static,
345    {
346        self.stream_builder = self
347            .stream_builder
348            .exchange(name, filter);
349        self.decoder
350            .register_decoder_with_context::<T>(name, decoder_context);
351        if let Some(predicate) = filter_fn {
352            self.decoder
353                .register_filter(name, predicate);
354        }
355
356        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
357            warn!(
358                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
359                name
360            );
361        }
362
363        self
364    }
365
366    /// Sets the block time interval for the stream.
367    ///
368    /// This controls how often the stream produces updates.
369    pub fn block_time(mut self, block_time: u64) -> Self {
370        self.stream_builder = self
371            .stream_builder
372            .block_time(block_time);
373        self
374    }
375
376    /// Sets the network operation timeout (deprecated).
377    ///
378    /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
379    /// This method is retained for backwards compatibility.
380    #[deprecated = "Use latency_buffer instead"]
381    pub fn timeout(mut self, timeout: u64) -> Self {
382        self.stream_builder = self.stream_builder.timeout(timeout);
383        self
384    }
385
386    /// Sets the latency buffer to aggregate same-block messages.
387    ///
388    /// This allows the supervisor to wait a short interval for all synchronizers to emit
389    /// before aggregating.
390    pub fn latency_buffer(mut self, timeout: u64) -> Self {
391        self.stream_builder = self.stream_builder.timeout(timeout);
392        self
393    }
394
395    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
396    pub fn max_missed_blocks(mut self, n: u64) -> Self {
397        self.stream_builder = self.stream_builder.max_missed_blocks(n);
398        self
399    }
400
401    /// Sets how long a synchronizer may take to process the initial message.
402    ///
403    /// Useful for data-intensive protocols where startup decoding takes longer.
404    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
405        self.stream_builder = self
406            .stream_builder
407            .startup_timeout(timeout);
408        self
409    }
410
411    /// Configures the stream to exclude state updates.
412    ///
413    /// This reduces bandwidth and decoding workload if protocol state is not of
414    /// interest (e.g. only process new tokens).
415    pub fn no_state(mut self, no_state: bool) -> Self {
416        self.stream_builder = self.stream_builder.no_state(no_state);
417        self
418    }
419
420    /// Sets the API key for authenticating with the Tycho server.
421    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
422        self.stream_builder = self.stream_builder.auth_key(auth_key);
423        self
424    }
425
426    /// Disables TLS/ SSL for the connection, using http and ws protocols.
427    ///
428    /// This is not recommended for production use.
429    pub fn no_tls(mut self, no_tls: bool) -> Self {
430        self.stream_builder = self.stream_builder.no_tls(no_tls);
431        self
432    }
433
434    /// Disable compression for the connection.
435    pub fn disable_compression(mut self) -> Self {
436        self.stream_builder = self
437            .stream_builder
438            .disable_compression();
439        self
440    }
441
442    /// Enables partial block updates (flashblocks).
443    pub fn enable_partial_blocks(mut self) -> Self {
444        self.stream_builder = self
445            .stream_builder
446            .enable_partial_blocks();
447        self
448    }
449
450    /// Exclude additional component IDs from all registered exchanges.
451    ///
452    /// These IDs are added to the shipped blocklist that is already applied by default (see
453    /// [`new`](Self::new)).
454    pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
455        if !ids.is_empty() {
456            tracing::info!("Blocklisting {} components", ids.len());
457            self.stream_builder = self.stream_builder.blocklisted_ids(ids);
458        }
459        self
460    }
461
462    /// Sets the stream end policy.
463    ///
464    /// Controls when the stream should stop based on synchronizer states.
465    ///
466    /// ## Note
467    /// The stream always ends latest if all protocols are stale or ended independent of
468    /// this configuration. This allows you to end the stream earlier than that.
469    ///
470    /// See [self::StreamEndPolicy] for possible configuration options.
471    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
472        self.stream_end_policy = stream_end_policy;
473        self
474    }
475
476    /// Provides token metadata used to decode startup snapshots and initialize protocol states.
477    ///
478    /// This is not a stream filter — components arriving after startup include their own token
479    /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
480    /// arriving via stream deltas are added automatically if they meet the quality threshold.
481    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
482        self.decoder.set_tokens(tokens).await;
483        self
484    }
485
486    /// Skips decoding errors for component state updates.
487    ///
488    /// Allows the stream to continue processing even if some states fail to decode,
489    /// logging a warning instead of panicking.
490    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
491        self.decoder
492            .skip_state_decode_failures(skip);
493        self
494    }
495
496    /// Sets the minimum token quality for tokens added via the stream.
497    ///
498    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
499    /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
500    /// apply consistent filtering.
501    pub fn min_token_quality(mut self, quality: u32) -> Self {
502        self.decoder.min_token_quality(quality);
503        self
504    }
505
506    /// Configures the retry policy for websocket reconnects.
507    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
508        self.stream_builder = self
509            .stream_builder
510            .websockets_retry_config(config);
511        self
512    }
513
514    /// Configures the retry policy for state synchronization.
515    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
516        self.stream_builder = self
517            .stream_builder
518            .state_synchronizer_retry_config(config);
519        self
520    }
521
522    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
523        &self.decoder
524    }
525
526    /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
527    ///
528    /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
529    /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
530    /// the confirmed stream and the pending processor.
531    ///
532    /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
533    /// `update_engine()` and cannot be simulated natively.
534    pub fn with_pending_indexer(
535        mut self,
536        extractor: &str,
537        indexer: Box<dyn TxDeltaIndexer>,
538    ) -> Result<Self, StreamError> {
539        if extractor.starts_with("vm:") {
540            return Err(StreamError::SetUpError(format!(
541                "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
542            )));
543        }
544        self.pending_indexers
545            .insert(extractor.to_string(), indexer);
546        Ok(self)
547    }
548
549    /// Enables controlled-step mode for testing.
550    ///
551    /// Returns a [`BlockStepController`] that lets the caller decide when each buffered block
552    /// is released for decoding. Call this before [`build`](Self::build) or
553    /// [`build_with_pending`](Self::build_with_pending) — both detect and wire up the gating
554    /// automatically.
555    ///
556    /// In production code, do not call this method; the stream runs at full speed.
557    pub fn with_step_controller(mut self) -> (Self, BlockStepController) {
558        let (trigger_tx, trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
559        let (peek_tx, peek_rx) =
560            tokio::sync::watch::channel::<Option<FeedMessage<BlockHeader>>>(None);
561
562        self.step_peek_tx = Some(peek_tx);
563        self.step_trigger_rx = Some(trigger_rx);
564
565        let controller = BlockStepController { trigger_tx, peek_rx };
566        (self, controller)
567    }
568
569    /// Spawns a background task that gates `FeedMessage` delivery.
570    ///
571    /// The task buffers each incoming message, publishes it to `peek_tx` so the
572    /// [`BlockStepController`] can inspect it, waits for a trigger, then forwards the message to
573    /// `output_tx` for the decode pipeline. If `advance_tx` is `Some`, a clone of the message is
574    /// also forwarded there (used by the pending-processor path) before the decode step.
575    /// When the input channel closes or a terminal error is received according to
576    /// `stream_end_policy`, the task exits and all output channels are dropped.
577    fn run_gating_task(
578        raw_rx: tokio::sync::mpsc::Receiver<
579            Result<FeedMessage<BlockHeader>, BlockSynchronizerError>,
580        >,
581        mut trigger_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
582        peek_tx: tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>,
583        output_tx: tokio::sync::mpsc::Sender<FeedMessage<BlockHeader>>,
584        stream_end_policy: StreamEndPolicy,
585    ) {
586        tokio::spawn(async move {
587            let mut raw_stream = ReceiverStream::new(raw_rx);
588            loop {
589                let msg = match raw_stream.next().await {
590                    Some(Ok(msg)) => msg,
591                    Some(Err(e)) => {
592                        error!("Block stream ended with terminal error: {e}");
593                        break;
594                    }
595                    None => break,
596                };
597
598                if stream_end_policy.should_end(msg.sync_states.values()) {
599                    error!(
600                        "Block stream ended due to {:?}: {:?}",
601                        stream_end_policy, msg.sync_states
602                    );
603                    break;
604                }
605
606                // Publish the buffered message so the caller can peek before triggering.
607                let _ = peek_tx.send(Some(msg.clone()));
608
609                // Block until the controller fires trigger_next_block(), or until it is dropped.
610                if trigger_rx.recv().await.is_none() {
611                    // Controller dropped — forward the buffered message and drain the rest
612                    // without gating, so the stream continues to its natural end.
613                    let _ = peek_tx.send(None);
614                    if output_tx.send(msg).await.is_err() {
615                        break;
616                    }
617                    while let Some(item) = raw_stream.next().await {
618                        let Ok(msg) = item else { break };
619                        if stream_end_policy.should_end(msg.sync_states.values()) {
620                            break;
621                        }
622                        if output_tx.send(msg).await.is_err() {
623                            break;
624                        }
625                    }
626                    break;
627                }
628
629                // Clear the peek slot before decoding so callers see None between blocks.
630                let _ = peek_tx.send(None);
631
632                if output_tx.send(msg).await.is_err() {
633                    break;
634                }
635            }
636        });
637    }
638
639    /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
640    /// in sync with it automatically.
641    ///
642    /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
643    /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
644    /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
645    /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
646    /// nothing for single-threaded use).
647    ///
648    /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
649    /// simulate a candidate bundle; it drains the channel automatically before computing.
650    pub async fn build_with_pending(
651        self,
652    ) -> Result<
653        (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
654        StreamError,
655    > {
656        initialize_hook_handlers().map_err(|e| {
657            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
658        })?;
659        let (_, rx) = self.stream_builder.build().await?;
660        let decoder = Arc::new(self.decoder);
661
662        let (advance_tx, advance_rx) =
663            tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
664        let pending = PendingBlockProcessor::new(
665            self.pending_indexers,
666            decoder.clone(),
667            self.chain,
668            advance_rx,
669        );
670
671        let chain = self.chain;
672        let stream_end_policy = self.stream_end_policy;
673
674        let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
675            if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
676                let (gated_tx, gated_rx) =
677                    tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
678                Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
679                Box::new(ReceiverStream::new(gated_rx))
680            } else {
681                let normal = ReceiverStream::new(rx)
682                    .take_while(move |msg| match msg {
683                        Ok(msg) => {
684                            let states = msg.sync_states.values();
685                            if stream_end_policy.should_end(states) {
686                                error!(
687                                    "Block stream ended due to {:?}: {:?}",
688                                    stream_end_policy, msg.sync_states
689                                );
690                                futures::future::ready(false)
691                            } else {
692                                futures::future::ready(true)
693                            }
694                        }
695                        Err(e) => {
696                            error!("Block stream ended with terminal error: {e}");
697                            futures::future::ready(false)
698                        }
699                    })
700                    .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
701                Box::new(Box::pin(normal))
702            };
703
704        let stream = Box::pin(decode_stream.then({
705            let decoder = decoder.clone();
706            move |msg| {
707                let decoder = decoder.clone();
708                let advance_tx = advance_tx.clone();
709                async move {
710                    let _ = advance_tx.send(msg.clone());
711                    decoder.decode(&msg).await.map_err(|e| {
712                        debug!(msg=?msg, "Decode error: {}", e);
713                        e
714                    })
715                }
716            }
717        }));
718        let stream = inject_native_wrapper(stream, chain);
719        Ok((stream, pending))
720    }
721
722    /// Builds and returns the configured protocol stream.
723    ///
724    /// See the module-level docs for details on stream behavior and emitted messages.
725    /// This method applies all builder settings and starts the stream.
726    pub async fn build(
727        self,
728    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
729        initialize_hook_handlers().map_err(|e| {
730            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
731        })?;
732        let (_, rx) = self.stream_builder.build().await?;
733        let decoder = Arc::new(self.decoder);
734        let chain = self.chain;
735        let stream_end_policy = self.stream_end_policy;
736
737        let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
738            if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
739                let (gated_tx, gated_rx) =
740                    tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
741                Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
742                Box::new(ReceiverStream::new(gated_rx))
743            } else {
744                let normal = ReceiverStream::new(rx)
745                    .take_while(move |msg| match msg {
746                        Ok(msg) => {
747                            let states = msg.sync_states.values();
748                            if stream_end_policy.should_end(states) {
749                                error!(
750                                    "Block stream ended due to {:?}: {:?}",
751                                    stream_end_policy, msg.sync_states
752                                );
753                                futures::future::ready(false)
754                            } else {
755                                futures::future::ready(true)
756                            }
757                        }
758                        Err(e) => {
759                            error!("Block stream ended with terminal error: {e}");
760                            futures::future::ready(false)
761                        }
762                    })
763                    .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
764                Box::new(Box::pin(normal))
765            };
766
767        let stream = Box::pin(decode_stream.then({
768            let decoder = decoder.clone();
769            move |msg| {
770                let decoder = decoder.clone();
771                async move {
772                    decoder.decode(&msg).await.map_err(|e| {
773                        debug!(msg=?msg, "Decode error: {}", e);
774                        e
775                    })
776                }
777            }
778        }));
779        let stream = inject_native_wrapper(stream, chain);
780        Ok(stream)
781    }
782}
783
784/// Wraps a decoded protocol stream to inject a `NativeWrapperState` component
785/// on the first successful update.
786///
787/// Skips injection for chains where the native and wrapped-native tokens share
788/// the same address (e.g. Starknet).
789fn inject_native_wrapper(
790    inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
791    chain: Chain,
792) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
793    let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
794    if !has_distinct_wrapper {
795        return Either::Left(inner);
796    }
797
798    Either::Right(
799        stream::once(async move {
800            let mut inner = inner;
801            let first = inner.next().await;
802            let modified = first.into_iter().map(move |result| {
803                result.map(|mut update| {
804                    let component = NativeWrapperState::component(chain);
805                    let id = component.id.to_string();
806                    update
807                        .new_pairs
808                        .insert(id.clone(), component);
809                    update
810                        .states
811                        .insert(id, Box::new(NativeWrapperState::new(chain)));
812                    debug!("Injected native_wrapper component for {chain}");
813                    update
814                })
815            });
816            stream::iter(modified).chain(inner)
817        })
818        .flatten(),
819    )
820}
821
822#[cfg(test)]
823mod tests {
824    use std::collections::HashMap;
825
826    use futures::{stream, StreamExt};
827    use tycho_common::models::Chain;
828
829    use super::*;
830    use crate::protocol::models::Update;
831
832    fn empty_update(block: u64) -> Update {
833        Update::new(block, HashMap::new(), HashMap::new())
834    }
835
836    #[tokio::test]
837    async fn test_inject_native_wrapper_first_message_only() {
838        let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
839        let input = stream::iter(updates);
840
841        let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
842            .collect()
843            .await;
844
845        assert_eq!(results.len(), 3);
846
847        let expected_id = NativeWrapperState::component(Chain::Ethereum)
848            .id
849            .to_string();
850
851        let first = results[0]
852            .as_ref()
853            .expect("first update ok");
854        assert!(
855            first
856                .new_pairs
857                .contains_key(&expected_id),
858            "first message should have native_wrapper component"
859        );
860        assert!(
861            first.states.contains_key(&expected_id),
862            "first message should have native_wrapper state"
863        );
864
865        let second = results[1]
866            .as_ref()
867            .expect("second update ok");
868        assert!(
869            !second
870                .new_pairs
871                .contains_key(&expected_id),
872            "second message should NOT have native_wrapper component"
873        );
874        assert!(
875            !second.states.contains_key(&expected_id),
876            "second message should NOT have native_wrapper state"
877        );
878    }
879
880    /// Verifies that `with_step_controller` returns both a modified builder and a controller.
881    ///
882    /// This test only checks that the builder method is callable and that the returned controller
883    /// compiles — it does not start any network connection.
884    #[tokio::test]
885    async fn test_with_step_controller_returns_controller() {
886        let builder = ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum);
887        let (_builder, controller) = builder.with_step_controller();
888        // The controller was successfully returned — verifying the public API is callable.
889        drop(controller);
890    }
891
892    /// Connects to a live Tycho instance, verifies that the stream blocks until
893    /// `trigger_next_block` is called, and that `peek_next_block` exposes the buffered message.
894    #[ignore = "requires live Tycho connection (TYCHO_AUTH_TOKEN env var)"]
895    #[tokio::test]
896    async fn test_step_controller_trigger_releases_block() {
897        use std::{env, time::Duration};
898
899        use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
900
901        let auth = env::var("TYCHO_AUTH_TOKEN").expect("TYCHO_AUTH_TOKEN must be set");
902
903        // Track a single well-known pool to minimise startup latency.
904        let usdc_weth_v2 = "0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc".to_string();
905        let (builder, controller) =
906            ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
907                .auth_key(Some(auth))
908                .exchange::<UniswapV2State>(
909                    "uniswap_v2",
910                    ComponentFilter::Ids(vec![usdc_weth_v2]),
911                    None,
912                )
913                .with_step_controller();
914
915        let (stream, _pending) = builder
916            .build_with_pending()
917            .await
918            .expect("build_with_pending failed");
919        tokio::pin!(stream);
920
921        // Wait up to 60 s for the first block to arrive in the gating buffer.
922        let peeked = tokio::time::timeout(Duration::from_secs(60), controller.peek_next_block())
923            .await
924            .expect("timed out waiting for first block to buffer")
925            .expect("stream ended before a block arrived");
926
927        assert!(!peeked.sync_states.is_empty(), "peeked block should carry sync states");
928
929        // Stream must be empty before we trigger — the gating task should be holding the block.
930        let pre_trigger = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
931        assert!(
932            pre_trigger.is_err(),
933            "stream should be blocked before trigger_next_block, got an item"
934        );
935
936        // Release the block.
937        controller
938            .trigger_next_block()
939            .expect("trigger_next_block failed");
940
941        // Stream should now yield the decoded update within one block time.
942        let update = tokio::time::timeout(Duration::from_secs(30), stream.next())
943            .await
944            .expect("timed out waiting for update after trigger")
945            .expect("stream ended unexpectedly");
946
947        assert!(update.is_ok(), "decoded update should be Ok, got: {:?}", update);
948    }
949}