tycho_simulation/evm/stream.rs
1//! Builder for configuring a multi-protocol stream.
2//!
3//! Provides a builder for creating a multi-protocol stream that produces
4//! protocol state update messages. It runs one synchronization worker per protocol
5//! and a supervisor that aggregates updates, ensuring gap‑free streaming
6//! and robust state tracking.
7//!
8//! ## Context
9//!
10//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
11//! into protocol state updates. Internally, each protocol runs in its own
12//! synchronization worker, and a supervisor aggregates their messages per block.
13//!
14//! ### Protocol Synchronization Worker
15//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
16//! - It first downloads components and their snapshots.
17//! - It then streams deltas.
18//! - It reacts to new or paused components by pulling snapshots or removing them from the active
19//! set.
20//!
21//! Each worker emits snapshots and deltas to the supervisor.
22//!
23//! ### Stream Supervisor
24//! The supervisor aggregates worker messages by block and assigns sync status.
25//! - It ensures workers produce gap-free messages.
26//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
27//! - It marks workers with terminal errors as `Ended`.
28//!
29//! Aggregating by block adds small latency, since the supervisor waits briefly for
30//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
31//!
32//! The stream ends only when **all** workers are `Stale` or `Ended`.
33//!
34//! ## Configuration
35//!
36//! The builder lets you customize:
37//!
38//! ### Protocols
39//! Select which protocols to synchronize.
40//!
41//! ### Tokens & Minimum Token Quality
42//! Provide token metadata up front so the decoder can initialize protocol states from startup
43//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
44//! include their own token metadata. To restrict processing to specific tokens, apply that filter
45//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
46//! automatically when their quality exceeds `min_token_quality`.
47//!
48//! ### StreamEndPolicy
49//! Control when the stream ends based on worker states. By default, it ends when all
50//! workers are `Stale` or `Ended`.
51//!
52//! ## Stream
53//! The stream emits one protocol state update every `block_time`. Each update
54//! reports protocol synchronization states and any changes.
55//!
56//! The `new_components` field lists newly deployed components and their tokens.
57//!
58//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
59//! most errors, so users should rarely need to restart it manually.
60//!
61//! ## Example
62//! ```no_run
63//! use tycho_common::models::Chain;
64//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
65//! use tycho_simulation::utils::load_all_tokens;
66//! use futures::StreamExt;
67//! use tycho_client::feed::component_tracker::ComponentFilter;
68//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
69//!
70//! #[tokio::main]
71//! async fn main() {
72//! let all_tokens = load_all_tokens(
73//! "tycho-beta.propellerheads.xyz",
74//! false,
75//! Some("sampletoken"),
76//! true,
77//! Chain::Ethereum,
78//! None,
79//! None,
80//! )
81//! .await
82//! .expect("Failed loading tokens");
83//!
84//! let protocol_stream =
85//! ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
86//! .auth_key(Some("sampletoken".to_string()))
87//! .skip_state_decode_failures(true)
88//! .exchange::<UniswapV2State>(
89//! "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
90//! )
91//! .set_tokens(all_tokens)
92//! .await
93//! .build()
94//! .await
95//! .expect("Failed building protocol stream");
96//! tokio::pin!(protocol_stream);
97//!
98//! // Loop through block updates
99//! while let Some(msg) = protocol_stream.next().await {
100//! dbg!(msg).expect("failed decoding");
101//! }
102//! }
103//! ```
104use std::{
105 collections::{HashMap, HashSet},
106 sync::Arc,
107 time,
108};
109
110use futures::{future::Either, stream, Stream, StreamExt};
111use tokio_stream::wrappers::ReceiverStream;
112use tracing::{debug, error, warn};
113use tycho_client::{
114 feed::{
115 component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
116 BlockSynchronizerError, FeedMessage, SynchronizerState,
117 },
118 stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
119};
120use tycho_common::{
121 models::{token::Token, Chain},
122 simulation::protocol_sim::ProtocolSim,
123 traits::TxDeltaIndexer,
124 Bytes,
125};
126
127use crate::{
128 evm::{
129 decoder::{StreamDecodeError, TychoStreamDecoder},
130 pending::PendingBlockProcessor,
131 protocol::{
132 native_wrapper::state::NativeWrapperState,
133 uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
134 },
135 },
136 protocol::{
137 errors::InvalidSnapshotError,
138 models::{DecoderContext, TryFromWithBlock, Update},
139 },
140 utils::default_blocklist,
141};
142
143const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
144
145#[derive(Default, Debug, Clone, Copy)]
146pub enum StreamEndPolicy {
147 /// End stream if all states are Stale or Ended (default)
148 #[default]
149 AllEndedOrStale,
150 /// End stream if any protocol ended
151 AnyEnded,
152 /// End stream if any protocol ended or is stale
153 AnyEndedOrStale,
154 /// End stream if any protocol is stale
155 AnyStale,
156}
157
158impl StreamEndPolicy {
159 fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
160 let mut it = states.into_iter();
161 match self {
162 StreamEndPolicy::AllEndedOrStale => false,
163 StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
164 StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
165 StreamEndPolicy::AnyEndedOrStale => {
166 it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
167 }
168 }
169 }
170}
171
172/// Handle returned by [`ProtocolStreamBuilder::with_step_controller`] that gives external
173/// control over when each buffered block is released for decoding.
174///
175/// Intended for complex test scenarios where the caller needs to observe what the next
176/// block contains before allowing the decoder pipeline to process it.
177///
178/// ## Drop behaviour
179///
180/// Dropping this controller ungates the stream: the gating task detects the closed trigger
181/// channel, forwards the currently-buffered block (if any), then continues passing subsequent
182/// blocks through without waiting for triggers — exactly as if step-control had never been
183/// enabled. The stream runs to its natural end.
184pub struct BlockStepController {
185 /// Sends a trigger signal to release the next buffered block.
186 trigger_tx: tokio::sync::mpsc::UnboundedSender<()>,
187 /// Watch channel containing the next buffered raw message, or `None` if no block is pending.
188 peek_rx: tokio::sync::watch::Receiver<Option<FeedMessage<BlockHeader>>>,
189}
190
191impl BlockStepController {
192 /// Releases the next buffered block for decoding and emission.
193 ///
194 /// Returns an error if the stream has already ended and the sender is disconnected.
195 pub fn trigger_next_block(&self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
196 // Send a unit value on the trigger channel to unblock the gating task.
197 self.trigger_tx.send(())
198 }
199
200 /// Returns the currently buffered block immediately, or `None` if no block is buffered yet.
201 pub fn try_peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
202 self.peek_rx.borrow().clone()
203 }
204
205 /// Waits until a block is buffered and returns it without consuming it.
206 ///
207 /// Returns `None` only if the stream has ended and no further blocks will arrive.
208 /// If a block is already buffered when this is called, it returns immediately.
209 pub async fn peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
210 // Clone so we don't hold a mutable borrow on self; wait_for checks the current
211 // value first, so this returns immediately if a block is already present.
212 let mut rx = self.peek_rx.clone();
213 let guard = rx
214 .wait_for(|v| v.is_some())
215 .await
216 .ok()?;
217 guard.clone()
218 }
219}
220
221/// Builds and configures the multi protocol stream described in the [module-level docs](self).
222///
223/// See the module documentation for details on protocols, configuration options, and
224/// stream behavior.
225pub struct ProtocolStreamBuilder {
226 decoder: TychoStreamDecoder<BlockHeader>,
227 stream_builder: TychoStreamBuilder,
228 stream_end_policy: StreamEndPolicy,
229 chain: Chain,
230 pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
231 /// Watch sender used to publish the currently-buffered raw block so the controller can peek
232 /// at it before triggering. `Some` iff step-control mode is active.
233 step_peek_tx: Option<tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>>,
234 /// Receiver half of the trigger channel. Held here until `build()` / `build_with_pending()`
235 /// transfers ownership to the gating task. `Some` iff step-control mode is active.
236 step_trigger_rx: Option<tokio::sync::mpsc::UnboundedReceiver<()>>,
237}
238
239impl ProtocolStreamBuilder {
240 /// Creates a new builder for a multi-protocol stream.
241 ///
242 /// The shipped pool blocklist is applied by default, excluding components known to break
243 /// simulation. Use [`blocklist_components`](Self::blocklist_components) to exclude additional
244 /// components.
245 ///
246 /// See the [module-level docs](self) for full details on stream behavior and configuration.
247 pub fn new(tycho_url: &str, chain: Chain) -> Self {
248 Self {
249 decoder: TychoStreamDecoder::new(),
250 stream_builder: TychoStreamBuilder::new(tycho_url, chain)
251 .blocklisted_ids(default_blocklist()),
252 stream_end_policy: StreamEndPolicy::default(),
253 chain,
254 pending_indexers: HashMap::new(),
255 step_peek_tx: None,
256 step_trigger_rx: None,
257 }
258 }
259
260 /// Adds a specific exchange to the stream.
261 ///
262 /// This configures the builder to include a new protocol synchronizer for `name`,
263 /// filtering its components according to `filter` and optionally `filter_fn`.
264 ///
265 /// The type parameter `T` specifies the decoder type for this exchange. All
266 /// component states for this exchange will be decoded into instances of `T`.
267 ///
268 /// # Parameters
269 ///
270 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
271 /// - `filter`: Defines the set of components to include in the stream.
272 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
273 /// expressible in `filter`.
274 ///
275 /// # Notes
276 ///
277 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
278 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
279 /// filter function is required to ensure correct decoding and quoting logic.
280 pub fn exchange<T>(
281 mut self,
282 name: &str,
283 filter: ComponentFilter,
284 filter_fn: Option<fn(&ComponentWithState) -> bool>,
285 ) -> Self
286 where
287 T: ProtocolSim
288 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
289 + Send
290 + 'static,
291 {
292 self.stream_builder = self
293 .stream_builder
294 .exchange(name, filter);
295 self.decoder.register_decoder::<T>(name);
296 if let Some(predicate) = filter_fn {
297 self.decoder
298 .register_filter(name, predicate);
299 }
300
301 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
302 warn!(
303 "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
304 name
305 );
306 }
307
308 self
309 }
310
311 /// Adds a specific exchange to the stream with decoder context.
312 ///
313 /// This configures the builder to include a new protocol synchronizer for `name`,
314 /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
315 /// the DecoderContext (this is useful to test protocols that are not live yet)
316 ///
317 /// The type parameter `T` specifies the decoder type for this exchange. All
318 /// component states for this exchange will be decoded into instances of `T`.
319 ///
320 /// # Parameters
321 ///
322 /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
323 /// - `filter`: Defines the set of components to include in the stream.
324 /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
325 /// expressible in `filter`.
326 /// - `decoder_context`: The decoder context for this exchange
327 ///
328 /// # Notes
329 ///
330 /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
331 /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
332 /// filter function is required to ensure correct decoding and quoting logic.
333 pub fn exchange_with_decoder_context<T>(
334 mut self,
335 name: &str,
336 filter: ComponentFilter,
337 filter_fn: Option<fn(&ComponentWithState) -> bool>,
338 decoder_context: DecoderContext,
339 ) -> Self
340 where
341 T: ProtocolSim
342 + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
343 + Send
344 + 'static,
345 {
346 self.stream_builder = self
347 .stream_builder
348 .exchange(name, filter);
349 self.decoder
350 .register_decoder_with_context::<T>(name, decoder_context);
351 if let Some(predicate) = filter_fn {
352 self.decoder
353 .register_filter(name, predicate);
354 }
355
356 if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
357 warn!(
358 "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
359 name
360 );
361 }
362
363 self
364 }
365
366 /// Sets the block time interval for the stream.
367 ///
368 /// This controls how often the stream produces updates.
369 pub fn block_time(mut self, block_time: u64) -> Self {
370 self.stream_builder = self
371 .stream_builder
372 .block_time(block_time);
373 self
374 }
375
376 /// Sets the network operation timeout (deprecated).
377 ///
378 /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
379 /// This method is retained for backwards compatibility.
380 #[deprecated = "Use latency_buffer instead"]
381 pub fn timeout(mut self, timeout: u64) -> Self {
382 self.stream_builder = self.stream_builder.timeout(timeout);
383 self
384 }
385
386 /// Sets the latency buffer to aggregate same-block messages.
387 ///
388 /// This allows the supervisor to wait a short interval for all synchronizers to emit
389 /// before aggregating.
390 pub fn latency_buffer(mut self, timeout: u64) -> Self {
391 self.stream_builder = self.stream_builder.timeout(timeout);
392 self
393 }
394
395 /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
396 pub fn max_missed_blocks(mut self, n: u64) -> Self {
397 self.stream_builder = self.stream_builder.max_missed_blocks(n);
398 self
399 }
400
401 /// Sets how long a synchronizer may take to process the initial message.
402 ///
403 /// Useful for data-intensive protocols where startup decoding takes longer.
404 pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
405 self.stream_builder = self
406 .stream_builder
407 .startup_timeout(timeout);
408 self
409 }
410
411 /// Configures the stream to exclude state updates.
412 ///
413 /// This reduces bandwidth and decoding workload if protocol state is not of
414 /// interest (e.g. only process new tokens).
415 pub fn no_state(mut self, no_state: bool) -> Self {
416 self.stream_builder = self.stream_builder.no_state(no_state);
417 self
418 }
419
420 /// Sets the API key for authenticating with the Tycho server.
421 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
422 self.stream_builder = self.stream_builder.auth_key(auth_key);
423 self
424 }
425
426 /// Disables TLS/ SSL for the connection, using http and ws protocols.
427 ///
428 /// This is not recommended for production use.
429 pub fn no_tls(mut self, no_tls: bool) -> Self {
430 self.stream_builder = self.stream_builder.no_tls(no_tls);
431 self
432 }
433
434 /// Disable compression for the connection.
435 pub fn disable_compression(mut self) -> Self {
436 self.stream_builder = self
437 .stream_builder
438 .disable_compression();
439 self
440 }
441
442 /// Enables partial block updates (flashblocks).
443 pub fn enable_partial_blocks(mut self) -> Self {
444 self.stream_builder = self
445 .stream_builder
446 .enable_partial_blocks();
447 self
448 }
449
450 /// Exclude additional component IDs from all registered exchanges.
451 ///
452 /// These IDs are added to the shipped blocklist that is already applied by default (see
453 /// [`new`](Self::new)).
454 pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
455 if !ids.is_empty() {
456 tracing::info!("Blocklisting {} components", ids.len());
457 self.stream_builder = self.stream_builder.blocklisted_ids(ids);
458 }
459 self
460 }
461
462 /// Sets the stream end policy.
463 ///
464 /// Controls when the stream should stop based on synchronizer states.
465 ///
466 /// ## Note
467 /// The stream always ends latest if all protocols are stale or ended independent of
468 /// this configuration. This allows you to end the stream earlier than that.
469 ///
470 /// See [self::StreamEndPolicy] for possible configuration options.
471 pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
472 self.stream_end_policy = stream_end_policy;
473 self
474 }
475
476 /// Provides token metadata used to decode startup snapshots and initialize protocol states.
477 ///
478 /// This is not a stream filter — components arriving after startup include their own token
479 /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
480 /// arriving via stream deltas are added automatically if they meet the quality threshold.
481 pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
482 self.decoder.set_tokens(tokens).await;
483 self
484 }
485
486 /// Skips decoding errors for component state updates.
487 ///
488 /// Allows the stream to continue processing even if some states fail to decode,
489 /// logging a warning instead of panicking.
490 pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
491 self.decoder
492 .skip_state_decode_failures(skip);
493 self
494 }
495
496 /// Sets the minimum token quality for tokens added via the stream.
497 ///
498 /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
499 /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
500 /// apply consistent filtering.
501 pub fn min_token_quality(mut self, quality: u32) -> Self {
502 self.decoder.min_token_quality(quality);
503 self
504 }
505
506 /// Configures the retry policy for websocket reconnects.
507 pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
508 self.stream_builder = self
509 .stream_builder
510 .websockets_retry_config(config);
511 self
512 }
513
514 /// Configures the retry policy for state synchronization.
515 pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
516 self.stream_builder = self
517 .stream_builder
518 .state_synchronizer_retry_config(config);
519 self
520 }
521
522 pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
523 &self.decoder
524 }
525
526 /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
527 ///
528 /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
529 /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
530 /// the confirmed stream and the pending processor.
531 ///
532 /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
533 /// `update_engine()` and cannot be simulated natively.
534 pub fn with_pending_indexer(
535 mut self,
536 extractor: &str,
537 indexer: Box<dyn TxDeltaIndexer>,
538 ) -> Result<Self, StreamError> {
539 if extractor.starts_with("vm:") {
540 return Err(StreamError::SetUpError(format!(
541 "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
542 )));
543 }
544 self.pending_indexers
545 .insert(extractor.to_string(), indexer);
546 Ok(self)
547 }
548
549 /// Enables controlled-step mode for testing.
550 ///
551 /// Returns a [`BlockStepController`] that lets the caller decide when each buffered block
552 /// is released for decoding. Call this before [`build`](Self::build) or
553 /// [`build_with_pending`](Self::build_with_pending) — both detect and wire up the gating
554 /// automatically.
555 ///
556 /// In production code, do not call this method; the stream runs at full speed.
557 pub fn with_step_controller(mut self) -> (Self, BlockStepController) {
558 let (trigger_tx, trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
559 let (peek_tx, peek_rx) =
560 tokio::sync::watch::channel::<Option<FeedMessage<BlockHeader>>>(None);
561
562 self.step_peek_tx = Some(peek_tx);
563 self.step_trigger_rx = Some(trigger_rx);
564
565 let controller = BlockStepController { trigger_tx, peek_rx };
566 (self, controller)
567 }
568
569 /// Spawns a background task that gates `FeedMessage` delivery.
570 ///
571 /// The task buffers each incoming message, publishes it to `peek_tx` so the
572 /// [`BlockStepController`] can inspect it, waits for a trigger, then forwards the message to
573 /// `output_tx` for the decode pipeline. If `advance_tx` is `Some`, a clone of the message is
574 /// also forwarded there (used by the pending-processor path) before the decode step.
575 /// When the input channel closes or a terminal error is received according to
576 /// `stream_end_policy`, the task exits and all output channels are dropped.
577 fn run_gating_task(
578 raw_rx: tokio::sync::mpsc::Receiver<
579 Result<FeedMessage<BlockHeader>, BlockSynchronizerError>,
580 >,
581 mut trigger_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
582 peek_tx: tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>,
583 output_tx: tokio::sync::mpsc::Sender<FeedMessage<BlockHeader>>,
584 stream_end_policy: StreamEndPolicy,
585 ) {
586 tokio::spawn(async move {
587 let mut raw_stream = ReceiverStream::new(raw_rx);
588 loop {
589 let msg = match raw_stream.next().await {
590 Some(Ok(msg)) => msg,
591 Some(Err(e)) => {
592 error!("Block stream ended with terminal error: {e}");
593 break;
594 }
595 None => break,
596 };
597
598 if stream_end_policy.should_end(msg.sync_states.values()) {
599 error!(
600 "Block stream ended due to {:?}: {:?}",
601 stream_end_policy, msg.sync_states
602 );
603 break;
604 }
605
606 // Publish the buffered message so the caller can peek before triggering.
607 let _ = peek_tx.send(Some(msg.clone()));
608
609 // Block until the controller fires trigger_next_block(), or until it is dropped.
610 if trigger_rx.recv().await.is_none() {
611 // Controller dropped — forward the buffered message and drain the rest
612 // without gating, so the stream continues to its natural end.
613 let _ = peek_tx.send(None);
614 if output_tx.send(msg).await.is_err() {
615 break;
616 }
617 while let Some(item) = raw_stream.next().await {
618 let Ok(msg) = item else { break };
619 if stream_end_policy.should_end(msg.sync_states.values()) {
620 break;
621 }
622 if output_tx.send(msg).await.is_err() {
623 break;
624 }
625 }
626 break;
627 }
628
629 // Clear the peek slot before decoding so callers see None between blocks.
630 let _ = peek_tx.send(None);
631
632 if output_tx.send(msg).await.is_err() {
633 break;
634 }
635 }
636 });
637 }
638
639 /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
640 /// in sync with it automatically.
641 ///
642 /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
643 /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
644 /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
645 /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
646 /// nothing for single-threaded use).
647 ///
648 /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
649 /// simulate a candidate bundle; it drains the channel automatically before computing.
650 pub async fn build_with_pending(
651 self,
652 ) -> Result<
653 (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
654 StreamError,
655 > {
656 initialize_hook_handlers().map_err(|e| {
657 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
658 })?;
659 let (_, rx) = self.stream_builder.build().await?;
660 let decoder = Arc::new(self.decoder);
661
662 let (advance_tx, advance_rx) =
663 tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
664 let pending = PendingBlockProcessor::new(
665 self.pending_indexers,
666 decoder.clone(),
667 self.chain,
668 advance_rx,
669 );
670
671 let chain = self.chain;
672 let stream_end_policy = self.stream_end_policy;
673
674 let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
675 if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
676 let (gated_tx, gated_rx) =
677 tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
678 Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
679 Box::new(ReceiverStream::new(gated_rx))
680 } else {
681 let normal = ReceiverStream::new(rx)
682 .take_while(move |msg| match msg {
683 Ok(msg) => {
684 let states = msg.sync_states.values();
685 if stream_end_policy.should_end(states) {
686 error!(
687 "Block stream ended due to {:?}: {:?}",
688 stream_end_policy, msg.sync_states
689 );
690 futures::future::ready(false)
691 } else {
692 futures::future::ready(true)
693 }
694 }
695 Err(e) => {
696 error!("Block stream ended with terminal error: {e}");
697 futures::future::ready(false)
698 }
699 })
700 .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
701 Box::new(Box::pin(normal))
702 };
703
704 let stream = Box::pin(decode_stream.then({
705 let decoder = decoder.clone();
706 move |msg| {
707 let decoder = decoder.clone();
708 let advance_tx = advance_tx.clone();
709 async move {
710 let _ = advance_tx.send(msg.clone());
711 decoder.decode(&msg).await.map_err(|e| {
712 debug!(msg=?msg, "Decode error: {}", e);
713 e
714 })
715 }
716 }
717 }));
718 let stream = inject_native_wrapper(stream, chain);
719 Ok((stream, pending))
720 }
721
722 /// Builds and returns the configured protocol stream.
723 ///
724 /// See the module-level docs for details on stream behavior and emitted messages.
725 /// This method applies all builder settings and starts the stream.
726 pub async fn build(
727 self,
728 ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
729 initialize_hook_handlers().map_err(|e| {
730 StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
731 })?;
732 let (_, rx) = self.stream_builder.build().await?;
733 let decoder = Arc::new(self.decoder);
734 let chain = self.chain;
735 let stream_end_policy = self.stream_end_policy;
736
737 let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
738 if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
739 let (gated_tx, gated_rx) =
740 tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
741 Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
742 Box::new(ReceiverStream::new(gated_rx))
743 } else {
744 let normal = ReceiverStream::new(rx)
745 .take_while(move |msg| match msg {
746 Ok(msg) => {
747 let states = msg.sync_states.values();
748 if stream_end_policy.should_end(states) {
749 error!(
750 "Block stream ended due to {:?}: {:?}",
751 stream_end_policy, msg.sync_states
752 );
753 futures::future::ready(false)
754 } else {
755 futures::future::ready(true)
756 }
757 }
758 Err(e) => {
759 error!("Block stream ended with terminal error: {e}");
760 futures::future::ready(false)
761 }
762 })
763 .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
764 Box::new(Box::pin(normal))
765 };
766
767 let stream = Box::pin(decode_stream.then({
768 let decoder = decoder.clone();
769 move |msg| {
770 let decoder = decoder.clone();
771 async move {
772 decoder.decode(&msg).await.map_err(|e| {
773 debug!(msg=?msg, "Decode error: {}", e);
774 e
775 })
776 }
777 }
778 }));
779 let stream = inject_native_wrapper(stream, chain);
780 Ok(stream)
781 }
782}
783
784/// Wraps a decoded protocol stream to inject a `NativeWrapperState` component
785/// on the first successful update.
786///
787/// Skips injection for chains where the native and wrapped-native tokens share
788/// the same address (e.g. Starknet).
789fn inject_native_wrapper(
790 inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
791 chain: Chain,
792) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
793 let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
794 if !has_distinct_wrapper {
795 return Either::Left(inner);
796 }
797
798 Either::Right(
799 stream::once(async move {
800 let mut inner = inner;
801 let first = inner.next().await;
802 let modified = first.into_iter().map(move |result| {
803 result.map(|mut update| {
804 let component = NativeWrapperState::component(chain);
805 let id = component.id.to_string();
806 update
807 .new_pairs
808 .insert(id.clone(), component);
809 update
810 .states
811 .insert(id, Box::new(NativeWrapperState::new(chain)));
812 debug!("Injected native_wrapper component for {chain}");
813 update
814 })
815 });
816 stream::iter(modified).chain(inner)
817 })
818 .flatten(),
819 )
820}
821
822#[cfg(test)]
823mod tests {
824 use std::collections::HashMap;
825
826 use futures::{stream, StreamExt};
827 use tycho_common::models::Chain;
828
829 use super::*;
830 use crate::protocol::models::Update;
831
832 fn empty_update(block: u64) -> Update {
833 Update::new(block, HashMap::new(), HashMap::new())
834 }
835
836 #[tokio::test]
837 async fn test_inject_native_wrapper_first_message_only() {
838 let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
839 let input = stream::iter(updates);
840
841 let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
842 .collect()
843 .await;
844
845 assert_eq!(results.len(), 3);
846
847 let expected_id = NativeWrapperState::component(Chain::Ethereum)
848 .id
849 .to_string();
850
851 let first = results[0]
852 .as_ref()
853 .expect("first update ok");
854 assert!(
855 first
856 .new_pairs
857 .contains_key(&expected_id),
858 "first message should have native_wrapper component"
859 );
860 assert!(
861 first.states.contains_key(&expected_id),
862 "first message should have native_wrapper state"
863 );
864
865 let second = results[1]
866 .as_ref()
867 .expect("second update ok");
868 assert!(
869 !second
870 .new_pairs
871 .contains_key(&expected_id),
872 "second message should NOT have native_wrapper component"
873 );
874 assert!(
875 !second.states.contains_key(&expected_id),
876 "second message should NOT have native_wrapper state"
877 );
878 }
879
880 /// Verifies that `with_step_controller` returns both a modified builder and a controller.
881 ///
882 /// This test only checks that the builder method is callable and that the returned controller
883 /// compiles — it does not start any network connection.
884 #[tokio::test]
885 async fn test_with_step_controller_returns_controller() {
886 let builder = ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum);
887 let (_builder, controller) = builder.with_step_controller();
888 // The controller was successfully returned — verifying the public API is callable.
889 drop(controller);
890 }
891
892 /// Connects to a live Tycho instance, verifies that the stream blocks until
893 /// `trigger_next_block` is called, and that `peek_next_block` exposes the buffered message.
894 #[ignore = "requires live Tycho connection (TYCHO_AUTH_TOKEN env var)"]
895 #[tokio::test]
896 async fn test_step_controller_trigger_releases_block() {
897 use std::{env, time::Duration};
898
899 use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
900
901 let auth = env::var("TYCHO_AUTH_TOKEN").expect("TYCHO_AUTH_TOKEN must be set");
902
903 // Track a single well-known pool to minimise startup latency.
904 let usdc_weth_v2 = "0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc".to_string();
905 let (builder, controller) =
906 ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
907 .auth_key(Some(auth))
908 .exchange::<UniswapV2State>(
909 "uniswap_v2",
910 ComponentFilter::Ids(vec![usdc_weth_v2]),
911 None,
912 )
913 .with_step_controller();
914
915 let (stream, _pending) = builder
916 .build_with_pending()
917 .await
918 .expect("build_with_pending failed");
919 tokio::pin!(stream);
920
921 // Wait up to 60 s for the first block to arrive in the gating buffer.
922 let peeked = tokio::time::timeout(Duration::from_secs(60), controller.peek_next_block())
923 .await
924 .expect("timed out waiting for first block to buffer")
925 .expect("stream ended before a block arrived");
926
927 assert!(!peeked.sync_states.is_empty(), "peeked block should carry sync states");
928
929 // Stream must be empty before we trigger — the gating task should be holding the block.
930 let pre_trigger = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
931 assert!(
932 pre_trigger.is_err(),
933 "stream should be blocked before trigger_next_block, got an item"
934 );
935
936 // Release the block.
937 controller
938 .trigger_next_block()
939 .expect("trigger_next_block failed");
940
941 // Stream should now yield the decoded update within one block time.
942 let update = tokio::time::timeout(Duration::from_secs(30), stream.next())
943 .await
944 .expect("timed out waiting for update after trigger")
945 .expect("stream ended unexpectedly");
946
947 assert!(update.is_ok(), "decoded update should be Ok, got: {:?}", update);
948 }
949}