Expand description
Builder for configuring a multi-protocol stream.
Provides a builder for creating a multi-protocol stream that produces
[protocol::models::Update] messages. It runs one synchronization worker per protocol
and a supervisor that aggregates updates, ensuring gap‑free streaming
and robust state tracking.
§Context
This stream wraps [tycho_client::stream::TychoStream]. It decodes FeedMessages
into [protocol::models::Update]s. Internally, each protocol runs in its own
synchronization worker, and a supervisor aggregates their messages per block.
§Protocol Synchronization Worker
A synchronization worker runs the snapshot + delta protocol from tycho-indexer.
- It first downloads components and their snapshots.
- It then streams deltas.
- It reacts to new or paused components by pulling snapshots or removing them from the active set.
Each worker emits snapshots and deltas to the supervisor.
§Stream Supervisor
The supervisor aggregates worker messages by block and assigns sync status.
- It ensures workers produce gap-free messages.
- It flags late workers as
Delayed, and marks themStaleif they exceedmax_missed_blocks. - It marks workers with terminal errors as
Ended.
Aggregating by block adds small latency, since the supervisor waits briefly for
all workers to emit. This latency only applies to workers in Ready or Delayed.
The stream ends only when all workers are Stale or Ended.
§Configuration
The builder lets you customize:
§Protocols
Select which protocols to synchronize.
§Tokens & Minimum Token Quality
Provide an initial set of tokens of interest. The first message includes only
components whose tokens match this set. The stream adds new tokens automatically
when a component is deployed and its quality exceeds min_token_quality.
§StreamEndPolicy
Control when the stream ends based on worker states. By default, it ends when all
workers are Stale or Ended.
§Stream
The stream emits one [protocol::models::Update] every block_time. Each update
reports protocol synchronization states and any changes.
The new_components field lists newly deployed components and their tokens.
The stream aims to run indefinitely. Internal retry and reconnect logic handle most errors, so users should rarely need to restart it manually.
§Example
use tycho_common::models::Chain;
use tycho_simulation::evm::stream::ProtocolStreamBuilder;
use tycho_simulation::utils::load_all_tokens;
use futures::StreamExt;
use tycho_client::feed::component_tracker::ComponentFilter;
use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
#[tokio::main]
async fn main() {
let all_tokens = load_all_tokens(
"tycho-beta.propellerheads.xyz",
false,
Some("sampletoken"),
true,
Chain::Ethereum,
None,
None,
)
.await
.expect("Failed loading tokens");
let mut protocol_stream =
ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
.auth_key(Some("sampletoken".to_string()))
.skip_state_decode_failures(true)
.exchange::<UniswapV2State>(
"uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
)
.set_tokens(all_tokens)
.await
.build()
.await
.expect("Failed building protocol stream");
// Loop through block updates
while let Some(msg) = protocol_stream.next().await {
dbg!(msg).expect("failed decoding");
}
}Structs§
- Protocol
Stream Builder - Builds and configures the multi protocol stream described in the module-level docs.