use std::{
collections::{HashMap, HashSet},
sync::Arc,
time,
};
use futures::{future::Either, stream, Stream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, warn};
use tycho_client::{
feed::{
component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
BlockSynchronizerError, FeedMessage, SynchronizerState,
},
stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
};
use tycho_common::{
models::{token::Token, Chain},
simulation::protocol_sim::ProtocolSim,
traits::TxDeltaIndexer,
Bytes,
};
use crate::{
evm::{
decoder::{StreamDecodeError, TychoStreamDecoder},
pending::PendingBlockProcessor,
protocol::{
native_wrapper::state::NativeWrapperState,
uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
},
},
protocol::{
errors::InvalidSnapshotError,
models::{DecoderContext, TryFromWithBlock, Update},
},
};
const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
#[derive(Default, Debug, Clone, Copy)]
pub enum StreamEndPolicy {
#[default]
AllEndedOrStale,
AnyEnded,
AnyEndedOrStale,
AnyStale,
}
impl StreamEndPolicy {
fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
let mut it = states.into_iter();
match self {
StreamEndPolicy::AllEndedOrStale => false,
StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
StreamEndPolicy::AnyEndedOrStale => {
it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
}
}
}
}
pub struct BlockStepController {
trigger_tx: tokio::sync::mpsc::UnboundedSender<()>,
peek_rx: tokio::sync::watch::Receiver<Option<FeedMessage<BlockHeader>>>,
}
impl BlockStepController {
pub fn trigger_next_block(&self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
self.trigger_tx.send(())
}
pub fn try_peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
self.peek_rx.borrow().clone()
}
pub async fn peek_next_block(&self) -> Option<FeedMessage<BlockHeader>> {
let mut rx = self.peek_rx.clone();
let guard = rx
.wait_for(|v| v.is_some())
.await
.ok()?;
guard.clone()
}
}
pub struct ProtocolStreamBuilder {
decoder: TychoStreamDecoder<BlockHeader>,
stream_builder: TychoStreamBuilder,
stream_end_policy: StreamEndPolicy,
chain: Chain,
pending_indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
step_peek_tx: Option<tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>>,
step_trigger_rx: Option<tokio::sync::mpsc::UnboundedReceiver<()>>,
}
impl ProtocolStreamBuilder {
pub fn new(tycho_url: &str, chain: Chain) -> Self {
Self {
decoder: TychoStreamDecoder::new(),
stream_builder: TychoStreamBuilder::new(tycho_url, chain),
stream_end_policy: StreamEndPolicy::default(),
chain,
pending_indexers: HashMap::new(),
step_peek_tx: None,
step_trigger_rx: None,
}
}
pub fn exchange<T>(
mut self,
name: &str,
filter: ComponentFilter,
filter_fn: Option<fn(&ComponentWithState) -> bool>,
) -> Self
where
T: ProtocolSim
+ TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
+ Send
+ 'static,
{
self.stream_builder = self
.stream_builder
.exchange(name, filter);
self.decoder.register_decoder::<T>(name);
if let Some(predicate) = filter_fn {
self.decoder
.register_filter(name, predicate);
}
if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
warn!(
"Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
name
);
}
self
}
pub fn exchange_with_decoder_context<T>(
mut self,
name: &str,
filter: ComponentFilter,
filter_fn: Option<fn(&ComponentWithState) -> bool>,
decoder_context: DecoderContext,
) -> Self
where
T: ProtocolSim
+ TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
+ Send
+ 'static,
{
self.stream_builder = self
.stream_builder
.exchange(name, filter);
self.decoder
.register_decoder_with_context::<T>(name, decoder_context);
if let Some(predicate) = filter_fn {
self.decoder
.register_filter(name, predicate);
}
if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
warn!(
"Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs",
name
);
}
self
}
pub fn block_time(mut self, block_time: u64) -> Self {
self.stream_builder = self
.stream_builder
.block_time(block_time);
self
}
#[deprecated = "Use latency_buffer instead"]
pub fn timeout(mut self, timeout: u64) -> Self {
self.stream_builder = self.stream_builder.timeout(timeout);
self
}
pub fn latency_buffer(mut self, timeout: u64) -> Self {
self.stream_builder = self.stream_builder.timeout(timeout);
self
}
pub fn max_missed_blocks(mut self, n: u64) -> Self {
self.stream_builder = self.stream_builder.max_missed_blocks(n);
self
}
pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
self.stream_builder = self
.stream_builder
.startup_timeout(timeout);
self
}
pub fn no_state(mut self, no_state: bool) -> Self {
self.stream_builder = self.stream_builder.no_state(no_state);
self
}
pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
self.stream_builder = self.stream_builder.auth_key(auth_key);
self
}
pub fn no_tls(mut self, no_tls: bool) -> Self {
self.stream_builder = self.stream_builder.no_tls(no_tls);
self
}
pub fn disable_compression(mut self) -> Self {
self.stream_builder = self
.stream_builder
.disable_compression();
self
}
pub fn enable_partial_blocks(mut self) -> Self {
self.stream_builder = self
.stream_builder
.enable_partial_blocks();
self
}
pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
if !ids.is_empty() {
tracing::info!("Blocklisting {} components", ids.len());
self.stream_builder = self.stream_builder.blocklisted_ids(ids);
}
self
}
pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
self.stream_end_policy = stream_end_policy;
self
}
pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
self.decoder.set_tokens(tokens).await;
self
}
pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
self.decoder
.skip_state_decode_failures(skip);
self
}
pub fn min_token_quality(mut self, quality: u32) -> Self {
self.decoder.min_token_quality(quality);
self
}
pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
self.stream_builder = self
.stream_builder
.websockets_retry_config(config);
self
}
pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
self.stream_builder = self
.stream_builder
.state_synchronizer_retry_config(config);
self
}
pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
&self.decoder
}
pub fn with_pending_indexer(
mut self,
extractor: &str,
indexer: Box<dyn TxDeltaIndexer>,
) -> Result<Self, StreamError> {
if extractor.starts_with("vm:") {
return Err(StreamError::SetUpError(format!(
"extractor '{extractor}' is a VM protocol; TxDeltaIndexer only supports native protocols"
)));
}
self.pending_indexers
.insert(extractor.to_string(), indexer);
Ok(self)
}
pub fn with_step_controller(mut self) -> (Self, BlockStepController) {
let (trigger_tx, trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let (peek_tx, peek_rx) =
tokio::sync::watch::channel::<Option<FeedMessage<BlockHeader>>>(None);
self.step_peek_tx = Some(peek_tx);
self.step_trigger_rx = Some(trigger_rx);
let controller = BlockStepController { trigger_tx, peek_rx };
(self, controller)
}
fn run_gating_task(
raw_rx: tokio::sync::mpsc::Receiver<
Result<FeedMessage<BlockHeader>, BlockSynchronizerError>,
>,
mut trigger_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
peek_tx: tokio::sync::watch::Sender<Option<FeedMessage<BlockHeader>>>,
output_tx: tokio::sync::mpsc::Sender<FeedMessage<BlockHeader>>,
stream_end_policy: StreamEndPolicy,
) {
tokio::spawn(async move {
let mut raw_stream = ReceiverStream::new(raw_rx);
loop {
let msg = match raw_stream.next().await {
Some(Ok(msg)) => msg,
Some(Err(e)) => {
error!("Block stream ended with terminal error: {e}");
break;
}
None => break,
};
if stream_end_policy.should_end(msg.sync_states.values()) {
error!(
"Block stream ended due to {:?}: {:?}",
stream_end_policy, msg.sync_states
);
break;
}
let _ = peek_tx.send(Some(msg.clone()));
if trigger_rx.recv().await.is_none() {
let _ = peek_tx.send(None);
if output_tx.send(msg).await.is_err() {
break;
}
while let Some(item) = raw_stream.next().await {
let Ok(msg) = item else { break };
if stream_end_policy.should_end(msg.sync_states.values()) {
break;
}
if output_tx.send(msg).await.is_err() {
break;
}
}
break;
}
let _ = peek_tx.send(None);
if output_tx.send(msg).await.is_err() {
break;
}
}
});
}
pub async fn build_with_pending(
self,
) -> Result<
(impl Stream<Item = Result<Update, StreamDecodeError>>, PendingBlockProcessor),
StreamError,
> {
initialize_hook_handlers().map_err(|e| {
StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
})?;
let (_, rx) = self.stream_builder.build().await?;
let decoder = Arc::new(self.decoder);
let (advance_tx, advance_rx) =
tokio::sync::mpsc::unbounded_channel::<FeedMessage<BlockHeader>>();
let pending = PendingBlockProcessor::new(
self.pending_indexers,
decoder.clone(),
self.chain,
advance_rx,
);
let chain = self.chain;
let stream_end_policy = self.stream_end_policy;
let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
let (gated_tx, gated_rx) =
tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
Box::new(ReceiverStream::new(gated_rx))
} else {
let normal = ReceiverStream::new(rx)
.take_while(move |msg| match msg {
Ok(msg) => {
let states = msg.sync_states.values();
if stream_end_policy.should_end(states) {
error!(
"Block stream ended due to {:?}: {:?}",
stream_end_policy, msg.sync_states
);
futures::future::ready(false)
} else {
futures::future::ready(true)
}
}
Err(e) => {
error!("Block stream ended with terminal error: {e}");
futures::future::ready(false)
}
})
.map(|msg| msg.expect("Safe since stream ends if we receive an error"));
Box::new(Box::pin(normal))
};
let stream = Box::pin(decode_stream.then({
let decoder = decoder.clone();
move |msg| {
let decoder = decoder.clone();
let advance_tx = advance_tx.clone();
async move {
let _ = advance_tx.send(msg.clone());
decoder.decode(&msg).await.map_err(|e| {
debug!(msg=?msg, "Decode error: {}", e);
e
})
}
}
}));
let stream = inject_native_wrapper(stream, chain);
Ok((stream, pending))
}
pub async fn build(
self,
) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
initialize_hook_handlers().map_err(|e| {
StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
})?;
let (_, rx) = self.stream_builder.build().await?;
let decoder = Arc::new(self.decoder);
let chain = self.chain;
let stream_end_policy = self.stream_end_policy;
let decode_stream: Box<dyn Stream<Item = FeedMessage<BlockHeader>> + Send + Unpin> =
if let (Some(peek_tx), Some(trigger_rx)) = (self.step_peek_tx, self.step_trigger_rx) {
let (gated_tx, gated_rx) =
tokio::sync::mpsc::channel::<FeedMessage<BlockHeader>>(1);
Self::run_gating_task(rx, trigger_rx, peek_tx, gated_tx, stream_end_policy);
Box::new(ReceiverStream::new(gated_rx))
} else {
let normal = ReceiverStream::new(rx)
.take_while(move |msg| match msg {
Ok(msg) => {
let states = msg.sync_states.values();
if stream_end_policy.should_end(states) {
error!(
"Block stream ended due to {:?}: {:?}",
stream_end_policy, msg.sync_states
);
futures::future::ready(false)
} else {
futures::future::ready(true)
}
}
Err(e) => {
error!("Block stream ended with terminal error: {e}");
futures::future::ready(false)
}
})
.map(|msg| msg.expect("Safe since stream ends if we receive an error"));
Box::new(Box::pin(normal))
};
let stream = Box::pin(decode_stream.then({
let decoder = decoder.clone();
move |msg| {
let decoder = decoder.clone();
async move {
decoder.decode(&msg).await.map_err(|e| {
debug!(msg=?msg, "Decode error: {}", e);
e
})
}
}
}));
let stream = inject_native_wrapper(stream, chain);
Ok(stream)
}
}
fn inject_native_wrapper(
inner: impl Stream<Item = Result<Update, StreamDecodeError>> + Unpin + Send + 'static,
chain: Chain,
) -> impl Stream<Item = Result<Update, StreamDecodeError>> + Send {
let has_distinct_wrapper = chain.native_token().address != chain.wrapped_native_token().address;
if !has_distinct_wrapper {
return Either::Left(inner);
}
Either::Right(
stream::once(async move {
let mut inner = inner;
let first = inner.next().await;
let modified = first.into_iter().map(move |result| {
result.map(|mut update| {
let component = NativeWrapperState::component(chain);
let id = component.id.to_string();
update
.new_pairs
.insert(id.clone(), component);
update
.states
.insert(id, Box::new(NativeWrapperState::new(chain)));
debug!("Injected native_wrapper component for {chain}");
update
})
});
stream::iter(modified).chain(inner)
})
.flatten(),
)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use futures::{stream, StreamExt};
use tycho_common::models::Chain;
use super::*;
use crate::protocol::models::Update;
fn empty_update(block: u64) -> Update {
Update::new(block, HashMap::new(), HashMap::new())
}
#[tokio::test]
async fn test_inject_native_wrapper_first_message_only() {
let updates = vec![Ok(empty_update(1)), Ok(empty_update(2)), Ok(empty_update(3))];
let input = stream::iter(updates);
let results: Vec<_> = inject_native_wrapper(input, Chain::Ethereum)
.collect()
.await;
assert_eq!(results.len(), 3);
let expected_id = NativeWrapperState::component(Chain::Ethereum)
.id
.to_string();
let first = results[0]
.as_ref()
.expect("first update ok");
assert!(
first
.new_pairs
.contains_key(&expected_id),
"first message should have native_wrapper component"
);
assert!(
first.states.contains_key(&expected_id),
"first message should have native_wrapper state"
);
let second = results[1]
.as_ref()
.expect("second update ok");
assert!(
!second
.new_pairs
.contains_key(&expected_id),
"second message should NOT have native_wrapper component"
);
assert!(
!second.states.contains_key(&expected_id),
"second message should NOT have native_wrapper state"
);
}
#[tokio::test]
async fn test_with_step_controller_returns_controller() {
let builder = ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum);
let (_builder, controller) = builder.with_step_controller();
drop(controller);
}
#[ignore = "requires live Tycho connection (TYCHO_AUTH_TOKEN env var)"]
#[tokio::test]
async fn test_step_controller_trigger_releases_block() {
use std::{env, time::Duration};
use crate::evm::protocol::uniswap_v2::state::UniswapV2State;
let auth = env::var("TYCHO_AUTH_TOKEN").expect("TYCHO_AUTH_TOKEN must be set");
let usdc_weth_v2 = "0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc".to_string();
let (builder, controller) =
ProtocolStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
.auth_key(Some(auth))
.exchange::<UniswapV2State>(
"uniswap_v2",
ComponentFilter::Ids(vec![usdc_weth_v2]),
None,
)
.with_step_controller();
let (stream, _pending) = builder
.build_with_pending()
.await
.expect("build_with_pending failed");
tokio::pin!(stream);
let peeked = tokio::time::timeout(Duration::from_secs(60), controller.peek_next_block())
.await
.expect("timed out waiting for first block to buffer")
.expect("stream ended before a block arrived");
assert!(!peeked.sync_states.is_empty(), "peeked block should carry sync states");
let pre_trigger = tokio::time::timeout(Duration::from_millis(200), stream.next()).await;
assert!(
pre_trigger.is_err(),
"stream should be blocked before trigger_next_block, got an item"
);
controller
.trigger_next_block()
.expect("trigger_next_block failed");
let update = tokio::time::timeout(Duration::from_secs(30), stream.next())
.await
.expect("timed out waiting for update after trigger")
.expect("stream ended unexpectedly");
assert!(update.is_ok(), "decoded update should be Ok, got: {:?}", update);
}
}