tycho-simulation 0.310.0

Provides tools for interacting with protocol states, calculating spot prices, and quoting token swaps.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
//! Builder for configuring a multi-protocol stream.
//!
//! Provides a builder for creating a multi-protocol stream that produces
//! protocol state update messages. It runs one synchronization worker per protocol
//! and a supervisor that aggregates updates, ensuring gap‑free streaming
//! and robust state tracking.
//!
//! ## Context
//!
//! This stream wraps a `TychoStream` from `tycho-client`. It decodes `FeedMessage`s
//! into protocol state updates. Internally, each protocol runs in its own
//! synchronization worker, and a supervisor aggregates their messages per block.
//!
//! ### Protocol Synchronization Worker
//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
//! - It first downloads components and their snapshots.
//! - It then streams deltas.
//! - It reacts to new or paused components by pulling snapshots or removing them from the active
//!   set.
//!
//! Each worker emits snapshots and deltas to the supervisor.
//!
//! ### Stream Supervisor
//! The supervisor aggregates worker messages by block and assigns sync status.
//! - It ensures workers produce gap-free messages.
//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
//! - It marks workers with terminal errors as `Ended`.
//!
//! Aggregating by block adds small latency, since the supervisor waits briefly for
//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
//!
//! The stream ends only when **all** workers are `Stale` or `Ended`.
//!
//! ## Configuration
//!
//! The builder lets you customize:
//!
//! ### Protocols
//! Select which protocols to synchronize.
//!
//! ### Tokens & Minimum Token Quality
//! Provide token metadata up front so the decoder can initialize protocol states from startup
//! snapshots. `set_tokens` does not act as an ongoing filter — components arriving after startup
//! include their own token metadata. To restrict processing to specific tokens, apply that filter
//! in your consumer when reading `new_components`. New tokens arriving via stream deltas are added
//! automatically when their quality exceeds `min_token_quality`.
//!
//! ### StreamEndPolicy
//! Control when the stream ends based on worker states. By default, it ends when all
//! workers are `Stale` or `Ended`.
//!
//! ## Stream
//! The stream emits one protocol state update every `block_time`. Each update
//! reports protocol synchronization states and any changes.
//!
//! The `new_components` field lists newly deployed components and their tokens.
//!
//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
//! most errors, so users should rarely need to restart it manually.
//!
//! ## Example
//! ```no_run
//! use tycho_common::models::Chain;
//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
//! use tycho_simulation::utils::load_all_tokens;
//! use futures::StreamExt;
//! use tycho_client::feed::component_tracker::ComponentFilter;
//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
//! use std::collections::HashSet;
//!
//! #[tokio::main]
//! async fn main() {
//!     let all_tokens = load_all_tokens(
//!         "tycho-beta.propellerheads.xyz",
//!         false,
//!         Some("sampletoken"),
//!         true,
//!         Chain::Ethereum,
//!         None,
//!         None,
//!     )
//!     .await
//!     .expect("Failed loading tokens");
//!
//!     let protocol_stream =
//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
//!             .auth_key(Some("sampletoken".to_string()))
//!             .skip_state_decode_failures(true)
//!             .exchange::<UniswapV2State>(
//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
//!             )
//!             .blocklist_components(HashSet::new())
//!             .set_tokens(all_tokens)
//!             .await
//!             .build()
//!             .await
//!             .expect("Failed building protocol stream");
//!     tokio::pin!(protocol_stream);
//!
//!     // Loop through block updates
//!     while let Some(msg) = protocol_stream.next().await {
//!         dbg!(msg).expect("failed decoding");
//!     }
//! }
//! ```
use std::{
    collections::{HashMap, HashSet},
    sync::Arc,
    time,
};

use futures::{future::Either, stream, Stream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, warn};
use tycho_client::{
    feed::{
        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
        BlockSynchronizerError, FeedMessage, SynchronizerState,
    },
    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
};
use tycho_common::{
    models::{token::Token, Chain},
    simulation::protocol_sim::ProtocolSim,
    traits::TxDeltaIndexer,
    Bytes,
};

use crate::{
    evm::{
        decoder::{StreamDecodeError, TychoStreamDecoder},
        pending::PendingBlockProcessor,
        protocol::{
            native_wrapper::state::NativeWrapperState,
            uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
        },
    },
    protocol::{
        errors::InvalidSnapshotError,
        models::{DecoderContext, TryFromWithBlock, Update},
    },
};

const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];

#[derive(Default, Debug, Clone, Copy)]
pub enum StreamEndPolicy {
    /// End stream if all states are Stale or Ended (default)
    #[default]
    AllEndedOrStale,
    /// End stream if any protocol ended
    AnyEnded,
    /// End stream if any protocol ended or is stale
    AnyEndedOrStale,
    /// End stream if any protocol is stale
    AnyStale,
}

impl StreamEndPolicy {
    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
        let mut it = states.into_iter();
        match self {
            StreamEndPolicy::AllEndedOrStale => false,
            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
            StreamEndPolicy::AnyEndedOrStale => {
                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
            }
        }
    }
}

/// Handle returned by [`ProtocolStreamBuilder::with_step_controller`] that gives external
/// control over when each buffered block is released for decoding.
///
/// Intended for complex test scenarios where the caller needs to observe what the next
/// block contains before allowing the decoder pipeline to process it.
///
/// ## Drop behaviour
///
/// Dropping this controller ungates the stream: the gating task detects the closed trigger
/// channel, forwards the currently-buffered block (if any), then continues passing subsequent
/// blocks through without waiting for triggers — exactly as if step-control had never been
/// enabled. The stream runs to its natural end.
pub struct BlockStepController {
    /// Sends a trigger signal to release the next buffered block.
    trigger_tx: tokio::sync::mpsc::UnboundedSender<()>,
    /// Watch channel containing the next buffered raw message, or `None` if no block is pending.
    peek_rx: tokio::sync::watch::Receiver<Option<FeedMessage<BlockHeader>>>,
}

impl BlockStepController {
    /// Releases the next buffered block for decoding and emission.
    ///
    /// Returns an error if the stream has already ended and the sender is disconnected.
    pub fn trigger_next_block(&self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
        // Send a unit value on the trigger channel to unblock the gating task.
        self.trigger_tx.send(())
    }

    /// Returns the currently buffered block immediately, or `None` if no block is buffered yet.
    pub fn try_peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
        self.peek_rx.borrow().clone()
    }

    /// Waits until a block is buffered and returns it without consuming it.
    ///
    /// Returns `None` only if the stream has ended and no further blocks will arrive.
    /// If a block is already buffered when this is called, it returns immediately.
    pub async fn peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
        // Clone so we don't hold a mutable borrow on self; wait_for checks the current
        // value first, so this returns immediately if a block is already present.
        let mut rx = self.peek_rx.clone();
        let guard = rx
            .wait_for(|v| v.is_some())
            .await
            .ok()?;
        guard.clone()
    }
}

/// Builds and configures the multi protocol stream described in the [module-level docs](self).
///
/// See the module documentation for details on protocols, configuration options, and
/// stream behavior.
pub struct ProtocolStreamBuilder {
    decoder: TychoStreamDecoder<BlockHeader>,
    stream_builder: TychoStreamBuilder,
    stream_end_policy: StreamEndPolicy,
    chain: Chain,
    pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
    /// Watch sender used to publish the currently-buffered raw block so the controller can peek
    /// at it before triggering. `Some` iff step-control mode is active.
    step_peek_tx: Option<tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>>,
    /// Receiver half of the trigger channel. Held here until `build()` / `build_with_pending()`
    /// transfers ownership to the gating task. `Some` iff step-control mode is active.
    step_trigger_rx: Option<tokio::sync::mpsc::UnboundedReceiver<()>>,
}

impl ProtocolStreamBuilder {
    /// Creates a new builder for a multi-protocol stream.
    ///
    /// See the [module-level docs](self) for full details on stream behavior and configuration.
    pub fn new(tycho_url: &str, chain: Chain) -> Self {
        Self {
            decoder: TychoStreamDecoder::new(),
            stream_builder: TychoStreamBuilder::new(tycho_url, chain),
            stream_end_policy: StreamEndPolicy::default(),
            chain,
            pending_indexers: HashMap::new(),
            step_peek_tx: None,
            step_trigger_rx: None,
        }
    }

    /// Adds a specific exchange to the stream.
    ///
    /// This configures the builder to include a new protocol synchronizer for `name`,
    /// filtering its components according to `filter` and optionally `filter_fn`.
    ///
    /// The type parameter `T` specifies the decoder type for this exchange. All
    /// component states for this exchange will be decoded into instances of `T`.
    ///
    /// # Parameters
    ///
    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
    /// - `filter`: Defines the set of components to include in the stream.
    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
    ///   expressible in `filter`.
    ///
    /// # Notes
    ///
    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
    /// filter function is required to ensure correct decoding and quoting logic.
    pub fn exchange<T>(
        mut self,
        name: &str,
        filter: ComponentFilter,
        filter_fn: Option<fn(&ComponentWithState) -> bool>,
    ) -> Self
    where
        T: ProtocolSim
            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
            + Send
            + 'static,
    {
        self.stream_builder = self
            .stream_builder
            .exchange(name, filter);
        self.decoder.register_decoder::<T>(name);
        if let Some(predicate) = filter_fn {
            self.decoder
                .register_filter(name, predicate);
        }

        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
            warn!(
                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
                name
            );
        }

        self
    }

    /// Adds a specific exchange to the stream with decoder context.
    ///
    /// This configures the builder to include a new protocol synchronizer for `name`,
    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
    /// the DecoderContext (this is useful to test protocols that are not live yet)
    ///
    /// The type parameter `T` specifies the decoder type for this exchange. All
    /// component states for this exchange will be decoded into instances of `T`.
    ///
    /// # Parameters
    ///
    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
    /// - `filter`: Defines the set of components to include in the stream.
    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
    ///   expressible in `filter`.
    /// - `decoder_context`: The decoder context for this exchange
    ///
    /// # Notes
    ///
    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
    /// filter function is required to ensure correct decoding and quoting logic.
    pub fn exchange_with_decoder_context<T>(
        mut self,
        name: &str,
        filter: ComponentFilter,
        filter_fn: Option<fn(&ComponentWithState) -> bool>,
        decoder_context: DecoderContext,
    ) -> Self
    where
        T: ProtocolSim
            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
            + Send
            + 'static,
    {
        self.stream_builder = self
            .stream_builder
            .exchange(name, filter);
        self.decoder
            .register_decoder_with_context::<T>(name, decoder_context);
        if let Some(predicate) = filter_fn {
            self.decoder
                .register_filter(name, predicate);
        }

        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
            warn!(
                "Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
                name
            );
        }

        self
    }

    /// Sets the block time interval for the stream.
    ///
    /// This controls how often the stream produces updates.
    pub fn block_time(mut self, block_time: u64) -> Self {
        self.stream_builder = self
            .stream_builder
            .block_time(block_time);
        self
    }

    /// Sets the network operation timeout (deprecated).
    ///
    /// Use [`latency_buffer()`](Self::latency_buffer) instead for controlling latency.
    /// This method is retained for backwards compatibility.
    #[deprecated = "Use latency_buffer instead"]
    pub fn timeout(mut self, timeout: u64) -> Self {
        self.stream_builder = self.stream_builder.timeout(timeout);
        self
    }

    /// Sets the latency buffer to aggregate same-block messages.
    ///
    /// This allows the supervisor to wait a short interval for all synchronizers to emit
    /// before aggregating.
    pub fn latency_buffer(mut self, timeout: u64) -> Self {
        self.stream_builder = self.stream_builder.timeout(timeout);
        self
    }

    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
    pub fn max_missed_blocks(mut self, n: u64) -> Self {
        self.stream_builder = self.stream_builder.max_missed_blocks(n);
        self
    }

    /// Sets how long a synchronizer may take to process the initial message.
    ///
    /// Useful for data-intensive protocols where startup decoding takes longer.
    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
        self.stream_builder = self
            .stream_builder
            .startup_timeout(timeout);
        self
    }

    /// Configures the stream to exclude state updates.
    ///
    /// This reduces bandwidth and decoding workload if protocol state is not of
    /// interest (e.g. only process new tokens).
    pub fn no_state(mut self, no_state: bool) -> Self {
        self.stream_builder = self.stream_builder.no_state(no_state);
        self
    }

    /// Sets the API key for authenticating with the Tycho server.
    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
        self.stream_builder = self.stream_builder.auth_key(auth_key);
        self
    }

    /// Disables TLS/ SSL for the connection, using http and ws protocols.
    ///
    /// This is not recommended for production use.
    pub fn no_tls(mut self, no_tls: bool) -> Self {
        self.stream_builder = self.stream_builder.no_tls(no_tls);
        self
    }

    /// Disable compression for the connection.
    pub fn disable_compression(mut self) -> Self {
        self.stream_builder = self
            .stream_builder
            .disable_compression();
        self
    }

    /// Enables partial block updates (flashblocks).
    pub fn enable_partial_blocks(mut self) -> Self {
        self.stream_builder = self
            .stream_builder
            .enable_partial_blocks();
        self
    }

    /// Exclude specific component IDs from all registered exchanges.
    pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
        if !ids.is_empty() {
            tracing::info!("Blocklisting {} components", ids.len());
            self.stream_builder = self.stream_builder.blocklisted_ids(ids);
        }
        self
    }

    /// Sets the stream end policy.
    ///
    /// Controls when the stream should stop based on synchronizer states.
    ///
    /// ## Note
    /// The stream always ends latest if all protocols are stale or ended independent of
    /// this configuration. This allows you to end the stream earlier than that.
    ///
    /// See [self::StreamEndPolicy] for possible configuration options.
    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
        self.stream_end_policy = stream_end_policy;
        self
    }

    /// Provides token metadata used to decode startup snapshots and initialize protocol states.
    ///
    /// This is not a stream filter — components arriving after startup include their own token
    /// metadata. To restrict to specific tokens, filter in your consumer logic. New tokens
    /// arriving via stream deltas are added automatically if they meet the quality threshold.
    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
        self.decoder.set_tokens(tokens).await;
        self
    }

    /// Skips decoding errors for component state updates.
    ///
    /// Allows the stream to continue processing even if some states fail to decode,
    /// logging a warning instead of panicking.
    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
        self.decoder
            .skip_state_decode_failures(skip);
        self
    }

    /// Sets the minimum token quality for tokens added via the stream.
    ///
    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
    /// Set this to the same value used in [`load_all_tokens()`](crate::utils::load_all_tokens) to
    /// apply consistent filtering.
    pub fn min_token_quality(mut self, quality: u32) -> Self {
        self.decoder.min_token_quality(quality);
        self
    }

    /// Configures the retry policy for websocket reconnects.
    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
        self.stream_builder = self
            .stream_builder
            .websockets_retry_config(config);
        self
    }

    /// Configures the retry policy for state synchronization.
    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
        self.stream_builder = self
            .stream_builder
            .state_synchronizer_retry_config(config);
        self
    }

    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
        &self.decoder
    }

    /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
    ///
    /// The indexer is associated with `extractor` (the protocol synchronizer name, e.g.
    /// `"uniswap_v3"`). Use [`build_with_pending`](Self::build_with_pending) to obtain both
    /// the confirmed stream and the pending processor.
    ///
    /// Returns an error if `extractor` names a VM protocol (prefix `"vm:"`), which requires
    /// `update_engine()` and cannot be simulated natively.
    pub fn with_pending_indexer(
        mut self,
        extractor: &str,
        indexer: Box<dyn TxDeltaIndexer>,
    ) -> Result<Self, StreamError> {
        if extractor.starts_with("vm:") {
            return Err(StreamError::SetUpError(format!(
                "extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
            )));
        }
        self.pending_indexers
            .insert(extractor.to_string(), indexer);
        Ok(self)
    }

    /// Enables controlled-step mode for testing.
    ///
    /// Returns a [`BlockStepController`] that lets the caller decide when each buffered block
    /// is released for decoding. Call this before [`build`](Self::build) or
    /// [`build_with_pending`](Self::build_with_pending) — both detect and wire up the gating
    /// automatically.
    ///
    /// In production code, do not call this method; the stream runs at full speed.
    pub fn with_step_controller(mut self) -> (Self, BlockStepController) {
        let (trigger_tx, trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
        let (peek_tx, peek_rx) =
            tokio::sync::watch::channel::<Option<FeedMessage<BlockHeader>>>(None);

        self.step_peek_tx = Some(peek_tx);
        self.step_trigger_rx = Some(trigger_rx);

        let controller = BlockStepController { trigger_tx, peek_rx };
        (self, controller)
    }

    /// Spawns a background task that gates `FeedMessage` delivery.
    ///
    /// The task buffers each incoming message, publishes it to `peek_tx` so the
    /// [`BlockStepController`] can inspect it, waits for a trigger, then forwards the message to
    /// `output_tx` for the decode pipeline. If `advance_tx` is `Some`, a clone of the message is
    /// also forwarded there (used by the pending-processor path) before the decode step.
    /// When the input channel closes or a terminal error is received according to
    /// `stream_end_policy`, the task exits and all output channels are dropped.
    fn run_gating_task(
        raw_rx: tokio::sync::mpsc::Receiver<
            Result<FeedMessage<BlockHeader>, BlockSynchronizerError>,
        >,
        mut trigger_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
        peek_tx: tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>,
        output_tx: tokio::sync::mpsc::Sender<FeedMessage<BlockHeader>>,
        stream_end_policy: StreamEndPolicy,
    ) {
        tokio::spawn(async move {
            let mut raw_stream = ReceiverStream::new(raw_rx);
            loop {
                let msg = match raw_stream.next().await {
                    Some(Ok(msg)) => msg,
                    Some(Err(e)) => {
                        error!("Block stream ended with terminal error: {e}");
                        break;
                    }
                    None => break,
                };

                if stream_end_policy.should_end(msg.sync_states.values()) {
                    error!(
                        "Block stream ended due to {:?}: {:?}",
                        stream_end_policy, msg.sync_states
                    );
                    break;
                }

                // Publish the buffered message so the caller can peek before triggering.
                let _ = peek_tx.send(Some(msg.clone()));

                // Block until the controller fires trigger_next_block(), or until it is dropped.
                if trigger_rx.recv().await.is_none() {
                    // Controller dropped — forward the buffered message and drain the rest
                    // without gating, so the stream continues to its natural end.
                    let _ = peek_tx.send(None);
                    if output_tx.send(msg).await.is_err() {
                        break;
                    }
                    while let Some(item) = raw_stream.next().await {
                        let Ok(msg) = item else { break };
                        if stream_end_policy.should_end(msg.sync_states.values()) {
                            break;
                        }
                        if output_tx.send(msg).await.is_err() {
                            break;
                        }
                    }
                    break;
                }

                // Clear the peek slot before decoding so callers see None between blocks.
                let _ = peek_tx.send(None);

                if output_tx.send(msg).await.is_err() {
                    break;
                }
            }
        });
    }

    /// Builds the confirmed protocol stream and a [`PendingBlockProcessor`] that stays
    /// in sync with it automatically.
    ///
    /// The stream pipeline forwards every confirmed [`FeedMessage`] to the processor via an
    /// internal unbounded channel — it never blocks waiting for the consumer. The consumer
    /// owns the returned `PendingBlockProcessor` exclusively and may wrap it in whatever
    /// synchronisation primitive suits their use case (e.g. `Mutex` for shared access,
    /// nothing for single-threaded use).
    ///
    /// Call [`generate_pending_update`](PendingBlockProcessor::generate_pending_update) to
    /// simulate a candidate bundle; it drains the channel automatically before computing.
    pub async fn build_with_pending(
        self,
    ) -> Result<
        (impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
        StreamError,
    > {
        initialize_hook_handlers().map_err(|e| {
            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
        })?;
        let (_, rx) = self.stream_builder.build().await?;
        let decoder = Arc::new(self.decoder);

        let (advance_tx, advance_rx) =
            tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
        let pending = PendingBlockProcessor::new(
            self.pending_indexers,
            decoder.clone(),
            self.chain,
            advance_rx,
        );

        let chain = self.chain;
        let stream_end_policy = self.stream_end_policy;

        let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
            if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
                let (gated_tx, gated_rx) =
                    tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
                Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
                Box::new(ReceiverStream::new(gated_rx))
            } else {
                let normal = ReceiverStream::new(rx)
                    .take_while(move |msg| match msg {
                        Ok(msg) => {
                            let states = msg.sync_states.values();
                            if stream_end_policy.should_end(states) {
                                error!(
                                    "Block stream ended due to {:?}: {:?}",
                                    stream_end_policy, msg.sync_states
                                );
                                futures::future::ready(false)
                            } else {
                                futures::future::ready(true)
                            }
                        }
                        Err(e) => {
                            error!("Block stream ended with terminal error: {e}");
                            futures::future::ready(false)
                        }
                    })
                    .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
                Box::new(Box::pin(normal))
            };

        let stream = Box::pin(decode_stream.then({
            let decoder = decoder.clone();
            move |msg| {
                let decoder = decoder.clone();
                let advance_tx = advance_tx.clone();
                async move {
                    let _ = advance_tx.send(msg.clone());
                    decoder.decode(&msg).await.map_err(|e| {
                        debug!(msg=?msg, "Decode error: {}", e);
                        e
                    })
                }
            }
        }));
        let stream = inject_native_wrapper(stream, chain);
        Ok((stream, pending))
    }

    /// Builds and returns the configured protocol stream.
    ///
    /// See the module-level docs for details on stream behavior and emitted messages.
    /// This method applies all builder settings and starts the stream.
    pub async fn build(
        self,
    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
        initialize_hook_handlers().map_err(|e| {
            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
        })?;
        let (_, rx) = self.stream_builder.build().await?;
        let decoder = Arc::new(self.decoder);
        let chain = self.chain;
        let stream_end_policy = self.stream_end_policy;

        let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
            if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
                let (gated_tx, gated_rx) =
                    tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
                Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
                Box::new(ReceiverStream::new(gated_rx))
            } else {
                let normal = ReceiverStream::new(rx)
                    .take_while(move |msg| match msg {
                        Ok(msg) => {
                            let states = msg.sync_states.values();
                            if stream_end_policy.should_end(states) {
                                error!(
                                    "Block stream ended due to {:?}: {:?}",
                                    stream_end_policy, msg.sync_states
                                );
                                futures::future::ready(false)
                            } else {
                                futures::future::ready(true)
                            }
                        }
                        Err(e) => {
                            error!("Block stream ended with terminal error: {e}");
                            futures::future::ready(false)
                        }
                    })
                    .map(|msg| msg.expect("Safe since stream ends if we receive an error"));
                Box::new(Box::pin(normal))
            };

        let stream = Box::pin(decode_stream.then({
            let decoder = decoder.clone();
            move |msg| {
                let decoder = decoder.clone();
                async move {
                    decoder.decode(&msg).await.map_err(|e| {
                        debug!(msg=?msg, "Decode error: {}", e);
                        e
                    })
                }
            }
        }));
        let stream = inject_native_wrapper(stream, chain);
        Ok(stream)
    }
}

/// Wraps a decoded protocol stream to inject a `NativeWrapperState` component
/// on the first successful update.
///
/// Skips injection for chains where the native and wrapped-native tokens share
/// the same address (e.g. Starknet).
fn inject_native_wrapper(
    inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
    chain: Chain,
) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
    let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
    if !has_distinct_wrapper {
        return Either::Left(inner);
    }

    Either::Right(
        stream::once(async move {
            let mut inner = inner;
            let first = inner.next().await;
            let modified = first.into_iter().map(move |result| {
                result.map(|mut update| {
                    let component = NativeWrapperState::component(chain);
                    let id = component.id.to_string();
                    update
                        .new_pairs
                        .insert(id.clone(), component);
                    update
                        .states
                        .insert(id, Box::new(NativeWrapperState::new(chain)));
                    debug!("Injected native_wrapper component for {chain}");
                    update
                })
            });
            stream::iter(modified).chain(inner)
        })
        .flatten(),
    )
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;

    use futures::{stream, StreamExt};
    use tycho_common::models::Chain;

    use super::*;
    use crate::protocol::models::Update;

    fn empty_update(block: u64) -> Update {
        Update::new(block, HashMap::new(), HashMap::new())
    }

    #[tokio::test]
    async fn test_inject_native_wrapper_first_message_only() {
        let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
        let input = stream::iter(updates);

        let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
            .collect()
            .await;

        assert_eq!(results.len(), 3);

        let expected_id = NativeWrapperState::component(Chain::Ethereum)
            .id
            .to_string();

        let first = results[0]
            .as_ref()
            .expect("first update ok");
        assert!(
            first
                .new_pairs
                .contains_key(&expected_id),
            "first message should have native_wrapper component"
        );
        assert!(
            first.states.contains_key(&expected_id),
            "first message should have native_wrapper state"
        );

        let second = results[1]
            .as_ref()
            .expect("second update ok");
        assert!(
            !second
                .new_pairs
                .contains_key(&expected_id),
            "second message should NOT have native_wrapper component"
        );
        assert!(
            !second.states.contains_key(&expected_id),
            "second message should NOT have native_wrapper state"
        );
    }

    /// Verifies that `with_step_controller` returns both a modified builder and a controller.
    ///
    /// This test only checks that the builder method is callable and that the returned controller
    /// compiles — it does not start any network connection.
    #[tokio::test]
    async fn test_with_step_controller_returns_controller() {
        let builder = ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum);
        let (_builder, controller) = builder.with_step_controller();
        // The controller was successfully returned — verifying the public API is callable.
        drop(controller);
    }

    /// Connects to a live Tycho instance, verifies that the stream blocks until
    /// `trigger_next_block` is called, and that `peek_next_block` exposes the buffered message.
    #[ignore = "requires live Tycho connection (TYCHO_AUTH_TOKEN env var)"]
    #[tokio::test]
    async fn test_step_controller_trigger_releases_block() {
        use std::{env, time::Duration};

        use crate::evm::protocol::uniswap_v2::state::UniswapV2State;

        let auth = env::var("TYCHO_AUTH_TOKEN").expect("TYCHO_AUTH_TOKEN must be set");

        // Track a single well-known pool to minimise startup latency.
        let usdc_weth_v2 = "0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc".to_string();
        let (builder, controller) =
            ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
                .auth_key(Some(auth))
                .exchange::<UniswapV2State>(
                    "uniswap_v2",
                    ComponentFilter::Ids(vec![usdc_weth_v2]),
                    None,
                )
                .with_step_controller();

        let (stream, _pending) = builder
            .build_with_pending()
            .await
            .expect("build_with_pending failed");
        tokio::pin!(stream);

        // Wait up to 60 s for the first block to arrive in the gating buffer.
        let peeked = tokio::time::timeout(Duration::from_secs(60), controller.peek_next_block())
            .await
            .expect("timed out waiting for first block to buffer")
            .expect("stream ended before a block arrived");

        assert!(!peeked.sync_states.is_empty(), "peeked block should carry sync states");

        // Stream must be empty before we trigger — the gating task should be holding the block.
        let pre_trigger = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
        assert!(
            pre_trigger.is_err(),
            "stream should be blocked before trigger_next_block, got an item"
        );

        // Release the block.
        controller
            .trigger_next_block()
            .expect("trigger_next_block failed");

        // Stream should now yield the decoded update within one block time.
        let update = tokio::time::timeout(Duration::from_secs(30), stream.next())
            .await
            .expect("timed out waiting for update after trigger")
            .expect("stream ended unexpectedly");

        assert!(update.is_ok(), "decoded update should be Ok, got: {:?}", update);
    }
}