Expand description
Builder for configuring a multi-protocol stream.
Provides a builder for creating a multi-protocol stream that produces protocol state update messages. It runs one synchronization worker per protocol and a supervisor that aggregates updates, ensuring gap‑free streaming and robust state tracking.
§Context
This stream wraps a TychoStream from tycho-client. It decodes FeedMessages
into protocol state updates. Internally, each protocol runs in its own
synchronization worker, and a supervisor aggregates their messages per block.
§Protocol Synchronization Worker
A synchronization worker runs the snapshot + delta protocol from tycho-indexer.
- It first downloads components and their snapshots.
- It then streams deltas.
- It reacts to new or paused components by pulling snapshots or removing them from the active set.
Each worker emits snapshots and deltas to the supervisor.
§Stream Supervisor
The supervisor aggregates worker messages by block and assigns sync status.
- It ensures workers produce gap-free messages.
- It flags late workers as
Delayed, and marks themStaleif they exceedmax_missed_blocks. - It marks workers with terminal errors as
Ended.
Aggregating by block adds small latency, since the supervisor waits briefly for
all workers to emit. This latency only applies to workers in Ready or Delayed.
The stream ends only when all workers are Stale or Ended.
§Configuration
The builder lets you customize:
§Protocols
Select which protocols to synchronize.
§Tokens & Minimum Token Quality
Provide token metadata up front so the decoder can initialize protocol states from startup
snapshots. set_tokens does not act as an ongoing filter — components arriving after startup
include their own token metadata. To restrict processing to specific tokens, apply that filter
in your consumer when reading new_components. New tokens arriving via stream deltas are added
automatically when their quality exceeds min_token_quality.
§StreamEndPolicy
Control when the stream ends based on worker states. By default, it ends when all
workers are Stale or Ended.
§Stream
The stream emits one protocol state update every block_time. Each update
reports protocol synchronization states and any changes.
The new_components field lists newly deployed components and their tokens.
The stream aims to run indefinitely. Internal retry and reconnect logic handle most errors, so users should rarely need to restart it manually.
§Example
use tycho_common::models::Chain;
use tycho_simulation::evm::stream::ProtocolStreamBuilder;
use tycho_simulation::utils::load_all_tokens;
use futures::StreamExt;
use tycho_client::feed::component_tracker::ComponentFilter;
use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
use std::collections::HashSet;
#[tokio::main]
async fn main() {
let all_tokens = load_all_tokens(
"tycho-beta.propellerheads.xyz",
false,
Some("sampletoken"),
true,
Chain::Ethereum,
None,
None,
)
.await
.expect("Failed loading tokens");
let protocol_stream =
ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
.auth_key(Some("sampletoken".to_string()))
.skip_state_decode_failures(true)
.exchange::<UniswapV2State>(
"uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
)
.blocklist_components(HashSet::new())
.set_tokens(all_tokens)
.await
.build()
.await
.expect("Failed building protocol stream");
tokio::pin!(protocol_stream);
// Loop through block updates
while let Some(msg) = protocol_stream.next().await {
dbg!(msg).expect("failed decoding");
}
}Structs§
- Protocol
Stream Builder - Builds and configures the multi protocol stream described in the module-level docs.