tycho-simulation 0.255.1

Provides tools for interacting with protocol states, calculating spot prices, and quoting token swaps.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//! Builder for configuring a multi-protocol stream.
//!
//! Provides a builder for creating a multi-protocol stream that produces
//! [`protocol::models::Update`] messages. It runs one synchronization worker per protocol
//! and a supervisor that aggregates updates, ensuring gap‑free streaming
//! and robust state tracking.
//!
//! ## Context
//!
//! This stream wraps [`tycho_client::stream::TychoStream`]. It decodes `FeedMessage`s
//! into [`protocol::models::Update`]s. Internally, each protocol runs in its own
//! synchronization worker, and a supervisor aggregates their messages per block.
//!
//! ### Protocol Synchronization Worker
//! A synchronization worker runs the snapshot + delta protocol from `tycho-indexer`.
//! - It first downloads components and their snapshots.
//! - It then streams deltas.
//! - It reacts to new or paused components by pulling snapshots or removing them from the active
//!   set.
//!
//! Each worker emits snapshots and deltas to the supervisor.
//!
//! ### Stream Supervisor
//! The supervisor aggregates worker messages by block and assigns sync status.
//! - It ensures workers produce gap-free messages.
//! - It flags late workers as `Delayed`, and marks them `Stale` if they exceed `max_missed_blocks`.
//! - It marks workers with terminal errors as `Ended`.
//!
//! Aggregating by block adds small latency, since the supervisor waits briefly for
//! all workers to emit. This latency only applies to workers in `Ready` or `Delayed`.
//!
//! The stream ends only when **all** workers are `Stale` or `Ended`.
//!
//! ## Configuration
//!
//! The builder lets you customize:
//!
//! ### Protocols
//! Select which protocols to synchronize.
//!
//! ### Tokens & Minimum Token Quality
//! Provide an initial set of tokens of interest. The first message includes only
//! components whose tokens match this set. The stream adds new tokens automatically
//! when a component is deployed and its quality exceeds `min_token_quality`.
//!
//! ### StreamEndPolicy
//! Control when the stream ends based on worker states. By default, it ends when all
//! workers are `Stale` or `Ended`.
//!
//! ## Stream
//! The stream emits one [`protocol::models::Update`] every `block_time`. Each update
//! reports protocol synchronization states and any changes.
//!
//! The `new_components` field lists newly deployed components and their tokens.
//!
//! The stream aims to run indefinitely. Internal retry and reconnect logic handle
//! most errors, so users should rarely need to restart it manually.
//!
//! ## Example
//! ```no_run
//! use tycho_common::models::Chain;
//! use tycho_simulation::evm::stream::ProtocolStreamBuilder;
//! use tycho_simulation::utils::load_all_tokens;
//! use futures::StreamExt;
//! use tycho_client::feed::component_tracker::ComponentFilter;
//! use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
//! use std::collections::HashSet;
//!
//! #[tokio::main]
//! async fn main() {
//!     let all_tokens = load_all_tokens(
//!         "tycho-beta.propellerheads.xyz",
//!         false,
//!         Some("sampletoken"),
//!         true,
//!         Chain::Ethereum,
//!         None,
//!         None,
//!     )
//!     .await
//!     .expect("Failed loading tokens");
//!
//!     let mut protocol_stream =
//!         ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
//!             .auth_key(Some("sampletoken".to_string()))
//!             .skip_state_decode_failures(true)
//!             .exchange::<UniswapV2State>(
//!                 "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
//!             )
//!             .blocklist_components(HashSet::new())
//!             .set_tokens(all_tokens)
//!             .await
//!             .build()
//!             .await
//!             .expect("Failed building protocol stream");
//!
//!     // Loop through block updates
//!     while let Some(msg) = protocol_stream.next().await {
//!         dbg!(msg).expect("failed decoding");
//!     }
//! }
//! ```
use std::{
    collections::{HashMap, HashSet},
    sync::Arc,
    time,
};

use futures::{Stream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, warn};
use tycho_client::{
    feed::{
        component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
        SynchronizerState,
    },
    stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
};
use tycho_common::{
    models::{token::Token, Chain},
    simulation::protocol_sim::ProtocolSim,
    Bytes,
};

use crate::{
    evm::{
        decoder::{StreamDecodeError, TychoStreamDecoder},
        protocol::uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
    },
    protocol::{
        errors::InvalidSnapshotError,
        models::{DecoderContext, TryFromWithBlock, Update},
    },
};

const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];

#[derive(Default, Debug, Clone, Copy)]
pub enum StreamEndPolicy {
    /// End stream if all states are Stale or Ended (default)
    #[default]
    AllEndedOrStale,
    /// End stream if any protocol ended
    AnyEnded,
    /// End stream if any protocol ended or is stale
    AnyEndedOrStale,
    /// End stream if any protocol is stale
    AnyStale,
}

impl StreamEndPolicy {
    fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
        let mut it = states.into_iter();
        match self {
            StreamEndPolicy::AllEndedOrStale => false,
            StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
            StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
            StreamEndPolicy::AnyEndedOrStale => {
                it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
            }
        }
    }
}

/// Builds and configures the multi protocol stream described in the [module-level docs](self).
///
/// See the module documentation for details on protocols, configuration options, and
/// stream behavior.
pub struct ProtocolStreamBuilder {
    decoder: TychoStreamDecoder<BlockHeader>,
    stream_builder: TychoStreamBuilder,
    stream_end_policy: StreamEndPolicy,
}

impl ProtocolStreamBuilder {
    /// Creates a new builder for a multi-protocol stream.
    ///
    /// See the [module-level docs](self) for full details on stream behavior and configuration.
    pub fn new(tycho_url: &str, chain: Chain) -> Self {
        Self {
            decoder: TychoStreamDecoder::new(),
            stream_builder: TychoStreamBuilder::new(tycho_url, chain.into()),
            stream_end_policy: StreamEndPolicy::default(),
        }
    }

    /// Adds a specific exchange to the stream.
    ///
    /// This configures the builder to include a new protocol synchronizer for `name`,
    /// filtering its components according to `filter` and optionally `filter_fn`.
    ///
    /// The type parameter `T` specifies the decoder type for this exchange. All
    /// component states for this exchange will be decoded into instances of `T`.
    ///
    /// # Parameters
    ///
    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
    /// - `filter`: Defines the set of components to include in the stream.
    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
    ///   expressible in `filter`.
    ///
    /// # Notes
    ///
    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
    /// filter function is required to ensure correct decoding and quoting logic.
    pub fn exchange<T>(
        mut self,
        name: &str,
        filter: ComponentFilter,
        filter_fn: Option<fn(&ComponentWithState) -> bool>,
    ) -> Self
    where
        T: ProtocolSim
            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
            + Send
            + 'static,
    {
        self.stream_builder = self
            .stream_builder
            .exchange(name, filter);
        self.decoder.register_decoder::<T>(name);
        if let Some(predicate) = filter_fn {
            self.decoder
                .register_filter(name, predicate);
        }

        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
        }

        self
    }

    /// Adds a specific exchange to the stream with decoder context.
    ///
    /// This configures the builder to include a new protocol synchronizer for `name`,
    /// filtering its components according to `filter` and optionally `filter_fn`. It also registers
    /// the DecoderContext (this is useful to test protocols that are not live yet)
    ///
    /// The type parameter `T` specifies the decoder type for this exchange. All
    /// component states for this exchange will be decoded into instances of `T`.
    ///
    /// # Parameters
    ///
    /// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
    /// - `filter`: Defines the set of components to include in the stream.
    /// - `filter_fn`: Optional custom filter function for client-side filtering of components not
    ///   expressible in `filter`.
    /// - `decoder_context`: The decoder context for this exchange
    ///
    /// # Notes
    ///
    /// For certain protocols (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`, `"vm:curve"`), omitting
    /// `filter_fn` may cause decoding errors or incorrect results. In these cases, a proper
    /// filter function is required to ensure correct decoding and quoting logic.
    pub fn exchange_with_decoder_context<T>(
        mut self,
        name: &str,
        filter: ComponentFilter,
        filter_fn: Option<fn(&ComponentWithState) -> bool>,
        decoder_context: DecoderContext,
    ) -> Self
    where
        T: ProtocolSim
            + TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
            + Send
            + 'static,
    {
        self.stream_builder = self
            .stream_builder
            .exchange(name, filter);
        self.decoder
            .register_decoder_with_context::<T>(name, decoder_context);
        if let Some(predicate) = filter_fn {
            self.decoder
                .register_filter(name, predicate);
        }

        if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
            warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
        }

        self
    }

    /// Sets the block time interval for the stream.
    ///
    /// This controls how often the stream produces updates.
    pub fn block_time(mut self, block_time: u64) -> Self {
        self.stream_builder = self
            .stream_builder
            .block_time(block_time);
        self
    }

    /// Sets the network operation timeout (deprecated).
    ///
    /// Use [`latency_buffer`] instead for controlling latency.
    /// This method is retained for backwards compatibility.
    #[deprecated = "Use latency_buffer instead"]
    pub fn timeout(mut self, timeout: u64) -> Self {
        self.stream_builder = self.stream_builder.timeout(timeout);
        self
    }

    /// Sets the latency buffer to aggregate same-block messages.
    ///
    /// This allows the supervisor to wait a short interval for all synchronizers to emit
    /// before aggregating.
    pub fn latency_buffer(mut self, timeout: u64) -> Self {
        self.stream_builder = self.stream_builder.timeout(timeout);
        self
    }

    /// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
    pub fn max_missed_blocks(mut self, n: u64) -> Self {
        self.stream_builder = self.stream_builder.max_missed_blocks(n);
        self
    }

    /// Sets how long a synchronizer may take to process the initial message.
    ///
    /// Useful for data-intensive protocols where startup decoding takes longer.
    pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
        self.stream_builder = self
            .stream_builder
            .startup_timeout(timeout);
        self
    }

    /// Configures the stream to exclude state updates.
    ///
    /// This reduces bandwidth and decoding workload if protocol state is not of
    /// interest (e.g. only process new tokens).
    pub fn no_state(mut self, no_state: bool) -> Self {
        self.stream_builder = self.stream_builder.no_state(no_state);
        self
    }

    /// Sets the API key for authenticating with the Tycho server.
    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
        self.stream_builder = self.stream_builder.auth_key(auth_key);
        self
    }

    /// Disables TLS/ SSL for the connection, using http and ws protocols.
    ///
    /// This is not recommended for production use.
    pub fn no_tls(mut self, no_tls: bool) -> Self {
        self.stream_builder = self.stream_builder.no_tls(no_tls);
        self
    }

    /// Disable compression for the connection.
    pub fn disable_compression(mut self) -> Self {
        self.stream_builder = self
            .stream_builder
            .disable_compression();
        self
    }

    /// Enables partial block updates (flashblocks).
    pub fn enable_partial_blocks(mut self) -> Self {
        self.stream_builder = self
            .stream_builder
            .enable_partial_blocks();
        self
    }

    /// Exclude specific component IDs from all registered exchanges.
    pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
        if !ids.is_empty() {
            tracing::info!("Blocklisting {} components", ids.len());
            self.stream_builder = self.stream_builder.blocklisted_ids(ids);
        }
        self
    }

    /// Sets the stream end policy.
    ///
    /// Controls when the stream should stop based on synchronizer states.
    ///
    /// ## Note
    /// The stream always ends latest if all protocols are stale or ended independent of
    /// this configuration. This allows you to end the stream earlier than that.
    ///
    /// See [self::StreamEndPolicy] for possible configuration options.
    pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
        self.stream_end_policy = stream_end_policy;
        self
    }

    /// Sets the initial tokens to consider during decoding.
    ///
    /// Only components containing these tokens will be decoded initially.
    /// New tokens may be added automatically if they meet the quality threshold.
    pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
        self.decoder.set_tokens(tokens).await;
        self
    }

    /// Skips decoding errors for component state updates.
    ///
    /// Allows the stream to continue processing even if some states fail to decode,
    /// logging a warning instead of panicking.
    pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
        self.decoder
            .skip_state_decode_failures(skip);
        self
    }

    /// Sets the minimum token quality for tokens added via the stream.
    ///
    /// Tokens arriving in stream deltas below this threshold are ignored. Defaults to 100.
    /// Set this to the same value used in [`load_all_tokens`] to apply consistent filtering.
    pub fn min_token_quality(mut self, quality: u32) -> Self {
        self.decoder.min_token_quality(quality);
        self
    }

    /// Configures the retry policy for websocket reconnects.
    pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
        self.stream_builder = self
            .stream_builder
            .websockets_retry_config(config);
        self
    }

    /// Configures the retry policy for state synchronization.
    pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
        self.stream_builder = self
            .stream_builder
            .state_synchronizer_retry_config(config);
        self
    }

    pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
        &self.decoder
    }

    /// Builds and returns the configured protocol stream.
    ///
    /// See the module-level docs for details on stream behavior and emitted messages.
    /// This method applies all builder settings and starts the stream.
    pub async fn build(
        self,
    ) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
        initialize_hook_handlers().map_err(|e| {
            StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
        })?;
        let (_, rx) = self.stream_builder.build().await?;
        let decoder = Arc::new(self.decoder);

        let stream = Box::pin(
            ReceiverStream::new(rx)
                .take_while(move |msg| match msg {
                    Ok(msg) => {
                        let states = msg.sync_states.values();
                        if self
                            .stream_end_policy
                            .should_end(states)
                        {
                            error!(
                                "Block stream ended due to {:?}: {:?}",
                                self.stream_end_policy, msg.sync_states
                            );
                            futures::future::ready(false)
                        } else {
                            futures::future::ready(true)
                        }
                    }
                    Err(e) => {
                        error!("Block stream ended with terminal error: {e}");
                        futures::future::ready(false)
                    }
                })
                .then({
                    let decoder = decoder.clone(); // Clone the decoder for the closure
                    move |msg| {
                        let decoder = decoder.clone(); // Clone again for the async block
                        async move {
                            let msg = msg.expect("Save since stream ends if we receive an error");
                            decoder.decode(&msg).await.map_err(|e| {
                                debug!(msg=?msg, "Decode error: {}", e);
                                e
                            })
                        }
                    }
                }),
        );
        Ok(stream)
    }
}