use std::{
collections::{HashMap, HashSet},
sync::Arc,
time,
};
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, warn};
use tycho_client::{
feed::{
component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
SynchronizerState,
},
stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
};
use tycho_common::{
models::{token::Token, Chain},
simulation::protocol_sim::ProtocolSim,
Bytes,
};
use crate::{
evm::{
decoder::{StreamDecodeError, TychoStreamDecoder},
protocol::uniswap_v4::hooks::hook_handler_creator::initialize_hook_handlers,
},
protocol::{
errors::InvalidSnapshotError,
models::{DecoderContext, TryFromWithBlock, Update},
},
};
const EXCHANGES_REQUIRING_FILTER: [&str; 2] = ["vm:balancer_v2", "vm:curve"];
#[derive(Default, Debug, Clone, Copy)]
pub enum StreamEndPolicy {
#[default]
AllEndedOrStale,
AnyEnded,
AnyEndedOrStale,
AnyStale,
}
impl StreamEndPolicy {
fn should_end<'a>(&self, states: impl IntoIterator<Item = &'a SynchronizerState>) -> bool {
let mut it = states.into_iter();
match self {
StreamEndPolicy::AllEndedOrStale => false,
StreamEndPolicy::AnyEnded => it.any(|s| matches!(s, SynchronizerState::Ended(_))),
StreamEndPolicy::AnyStale => it.any(|s| matches!(s, SynchronizerState::Stale(_))),
StreamEndPolicy::AnyEndedOrStale => {
it.any(|s| matches!(s, SynchronizerState::Stale(_) | SynchronizerState::Ended(_)))
}
}
}
}
pub struct ProtocolStreamBuilder {
decoder: TychoStreamDecoder<BlockHeader>,
stream_builder: TychoStreamBuilder,
stream_end_policy: StreamEndPolicy,
}
impl ProtocolStreamBuilder {
pub fn new(tycho_url: &str, chain: Chain) -> Self {
Self {
decoder: TychoStreamDecoder::new(),
stream_builder: TychoStreamBuilder::new(tycho_url, chain.into()),
stream_end_policy: StreamEndPolicy::default(),
}
}
pub fn exchange<T>(
mut self,
name: &str,
filter: ComponentFilter,
filter_fn: Option<fn(&ComponentWithState) -> bool>,
) -> Self
where
T: ProtocolSim
+ TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
+ Send
+ 'static,
{
self.stream_builder = self
.stream_builder
.exchange(name, filter);
self.decoder.register_decoder::<T>(name);
if let Some(predicate) = filter_fn {
self.decoder
.register_filter(name, predicate);
}
if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
}
self
}
pub fn exchange_with_decoder_context<T>(
mut self,
name: &str,
filter: ComponentFilter,
filter_fn: Option<fn(&ComponentWithState) -> bool>,
decoder_context: DecoderContext,
) -> Self
where
T: ProtocolSim
+ TryFromWithBlock<ComponentWithState, BlockHeader, Error = InvalidSnapshotError>
+ Send
+ 'static,
{
self.stream_builder = self
.stream_builder
.exchange(name, filter);
self.decoder
.register_decoder_with_context::<T>(name, decoder_context);
if let Some(predicate) = filter_fn {
self.decoder
.register_filter(name, predicate);
}
if EXCHANGES_REQUIRING_FILTER.contains(&name) && filter_fn.is_none() {
warn!("Warning: For exchange type '{}', it is necessary to set a filter function because not all pools are supported. See all filters at src/evm/protocol/filters.rs", name);
}
self
}
pub fn block_time(mut self, block_time: u64) -> Self {
self.stream_builder = self
.stream_builder
.block_time(block_time);
self
}
#[deprecated = "Use latency_buffer instead"]
pub fn timeout(mut self, timeout: u64) -> Self {
self.stream_builder = self.stream_builder.timeout(timeout);
self
}
pub fn latency_buffer(mut self, timeout: u64) -> Self {
self.stream_builder = self.stream_builder.timeout(timeout);
self
}
pub fn max_missed_blocks(mut self, n: u64) -> Self {
self.stream_builder = self.stream_builder.max_missed_blocks(n);
self
}
pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
self.stream_builder = self
.stream_builder
.startup_timeout(timeout);
self
}
pub fn no_state(mut self, no_state: bool) -> Self {
self.stream_builder = self.stream_builder.no_state(no_state);
self
}
pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
self.stream_builder = self.stream_builder.auth_key(auth_key);
self
}
pub fn no_tls(mut self, no_tls: bool) -> Self {
self.stream_builder = self.stream_builder.no_tls(no_tls);
self
}
pub fn disable_compression(mut self) -> Self {
self.stream_builder = self
.stream_builder
.disable_compression();
self
}
pub fn enable_partial_blocks(mut self) -> Self {
self.stream_builder = self
.stream_builder
.enable_partial_blocks();
self
}
pub fn blocklist_components(mut self, ids: HashSet<String>) -> Self {
if !ids.is_empty() {
tracing::info!("Blocklisting {} components", ids.len());
self.stream_builder = self.stream_builder.blocklisted_ids(ids);
}
self
}
pub fn stream_end_policy(mut self, stream_end_policy: StreamEndPolicy) -> Self {
self.stream_end_policy = stream_end_policy;
self
}
pub async fn set_tokens(self, tokens: HashMap<Bytes, Token>) -> Self {
self.decoder.set_tokens(tokens).await;
self
}
pub fn skip_state_decode_failures(mut self, skip: bool) -> Self {
self.decoder
.skip_state_decode_failures(skip);
self
}
pub fn min_token_quality(mut self, quality: u32) -> Self {
self.decoder.min_token_quality(quality);
self
}
pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
self.stream_builder = self
.stream_builder
.websockets_retry_config(config);
self
}
pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
self.stream_builder = self
.stream_builder
.state_synchronizer_retry_config(config);
self
}
pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
&self.decoder
}
pub async fn build(
self,
) -> Result<impl Stream<Item = Result<Update, StreamDecodeError>>, StreamError> {
initialize_hook_handlers().map_err(|e| {
StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
})?;
let (_, rx) = self.stream_builder.build().await?;
let decoder = Arc::new(self.decoder);
let stream = Box::pin(
ReceiverStream::new(rx)
.take_while(move |msg| match msg {
Ok(msg) => {
let states = msg.sync_states.values();
if self
.stream_end_policy
.should_end(states)
{
error!(
"Block stream ended due to {:?}: {:?}",
self.stream_end_policy, msg.sync_states
);
futures::future::ready(false)
} else {
futures::future::ready(true)
}
}
Err(e) => {
error!("Block stream ended with terminal error: {e}");
futures::future::ready(false)
}
})
.then({
let decoder = decoder.clone(); move |msg| {
let decoder = decoder.clone(); async move {
let msg = msg.expect("Save since stream ends if we receive an error");
decoder.decode(&msg).await.map_err(|e| {
debug!(msg=?msg, "Decode error: {}", e);
e
})
}
}
}),
);
Ok(stream)
}
}