Module stream

Module stream 

Source
Expand description

Builder for configuring a multi-protocol stream.

Provides a builder for creating a multi-protocol stream that produces [protocol::models::Update] messages. It runs one synchronization worker per protocol and a supervisor that aggregates updates, ensuring gap‑free streaming and robust state tracking.

§Context

This stream wraps [tycho_client::stream::TychoStream]. It decodes FeedMessages into [protocol::models::Update]s. Internally, each protocol runs in its own synchronization worker, and a supervisor aggregates their messages per block.

§Protocol Synchronization Worker

A synchronization worker runs the snapshot + delta protocol from tycho-indexer.

  • It first downloads components and their snapshots.
  • It then streams deltas.
  • It reacts to new or paused components by pulling snapshots or removing them from the active set.

Each worker emits snapshots and deltas to the supervisor.

§Stream Supervisor

The supervisor aggregates worker messages by block and assigns sync status.

  • It ensures workers produce gap-free messages.
  • It flags late workers as Delayed, and marks them Stale if they exceed max_missed_blocks.
  • It marks workers with terminal errors as Ended.

Aggregating by block adds small latency, since the supervisor waits briefly for all workers to emit. This latency only applies to workers in Ready or Delayed.

The stream ends only when all workers are Stale or Ended.

§Configuration

The builder lets you customize:

§Protocols

Select which protocols to synchronize.

§Tokens & Minimum Token Quality

Provide an initial set of tokens of interest. The first message includes only components whose tokens match this set. The stream adds new tokens automatically when a component is deployed and its quality exceeds min_token_quality.

§StreamEndPolicy

Control when the stream ends based on worker states. By default, it ends when all workers are Stale or Ended.

§Stream

The stream emits one [protocol::models::Update] every block_time. Each update reports protocol synchronization states and any changes.

The new_components field lists newly deployed components and their tokens.

The stream aims to run indefinitely. Internal retry and reconnect logic handle most errors, so users should rarely need to restart it manually.

§Example

use tycho_common::models::Chain;
use tycho_simulation::evm::stream::ProtocolStreamBuilder;
use tycho_simulation::utils::load_all_tokens;
use futures::StreamExt;
use tycho_client::feed::component_tracker::ComponentFilter;
use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;

#[tokio::main]
async fn main() {
    let all_tokens = load_all_tokens(
        "tycho-beta.propellerheads.xyz",
        false,
        Some("sampletoken"),
        true,
        Chain::Ethereum,
        None,
        None,
    )
    .await
    .expect("Failed loading tokens");

    let mut protocol_stream =
        ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
            .auth_key(Some("sampletoken".to_string()))
            .skip_state_decode_failures(true)
            .exchange::<UniswapV2State>(
                "uniswap_v2", ComponentFilter::with_tvl_range(5.0, 10.0), None
            )
            .set_tokens(all_tokens)
            .await
            .build()
            .await
            .expect("Failed building protocol stream");

    // Loop through block updates
    while let Some(msg) = protocol_stream.next().await {
        dbg!(msg).expect("failed decoding");
    }
}

Structs§

ProtocolStreamBuilder
Builds and configures the multi protocol stream described in the module-level docs.

Enums§

StreamEndPolicy