tycho_client/
stream.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use thiserror::Error;
8use tokio::{sync::mpsc::Receiver, task::JoinHandle};
9use tracing::info;
10use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
11
12use crate::{
13    deltas::DeltasClient,
14    feed::{
15        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
16        BlockSynchronizer, FeedMessage,
17    },
18    rpc::RPCClient,
19    HttpRPCClient, WsDeltasClient,
20};
21
22#[derive(Error, Debug)]
23pub enum StreamError {
24    #[error("Error during stream set up: {0}")]
25    SetUpError(String),
26
27    #[error("WebSocket client connection error: {0}")]
28    WebSocketConnectionError(String),
29
30    #[error("BlockSynchronizer error: {0}")]
31    BlockSynchronizerError(String),
32}
33
34pub struct TychoStreamBuilder {
35    tycho_url: String,
36    chain: Chain,
37    exchanges: HashMap<String, ComponentFilter>,
38    block_time: u64,
39    timeout: u64,
40    max_missed_blocks: u64,
41    no_state: bool,
42    auth_key: Option<String>,
43    no_tls: bool,
44}
45
46impl TychoStreamBuilder {
47    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
48    /// Initializes the builder with default values for block time and timeout based on the chain.
49    pub fn new(tycho_url: &str, chain: Chain) -> Self {
50        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
51        Self {
52            tycho_url: tycho_url.to_string(),
53            chain,
54            exchanges: HashMap::new(),
55            block_time,
56            timeout,
57            max_missed_blocks,
58            no_state: false,
59            auth_key: None,
60            no_tls: true,
61        }
62    }
63
64    /// Returns the default block_time, timeout and max_missed_blocks values for the given
65    /// blockchain network.
66    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
67        match chain {
68            Chain::Ethereum => (12, 36, 10),
69            Chain::Starknet => (2, 8, 50),
70            Chain::ZkSync => (3, 12, 50),
71            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
72            Chain::Base => (2, 12, 50),
73            Chain::Unichain => (1, 10, 100),
74        }
75    }
76
77    /// Adds an exchange and its corresponding filter to the Tycho client.
78    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
79        self.exchanges
80            .insert(name.to_string(), filter);
81        self
82    }
83
84    /// Sets the block time for the Tycho client.
85    pub fn block_time(mut self, block_time: u64) -> Self {
86        self.block_time = block_time;
87        self
88    }
89
90    /// Sets the timeout duration for network operations.
91    pub fn timeout(mut self, timeout: u64) -> Self {
92        self.timeout = timeout;
93        self
94    }
95
96    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
97        self.max_missed_blocks = max_missed_blocks;
98        self
99    }
100
101    /// Configures the client to exclude state updates from the stream.
102    pub fn no_state(mut self, no_state: bool) -> Self {
103        self.no_state = no_state;
104        self
105    }
106
107    /// Sets the API key for authenticating with the Tycho server.
108    ///
109    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
110    /// to false if you do this.
111    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
112        self.auth_key = auth_key;
113        self.no_tls = false;
114        self
115    }
116
117    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
118    pub fn no_tls(mut self, no_tls: bool) -> Self {
119        self.no_tls = no_tls;
120        self
121    }
122
123    /// Builds and starts the Tycho client, connecting to the Tycho server and
124    /// setting up the synchronization of exchange components.
125    pub async fn build(self) -> Result<(JoinHandle<()>, Receiver<FeedMessage>), StreamError> {
126        if self.exchanges.is_empty() {
127            return Err(StreamError::SetUpError(
128                "At least one exchange must be registered.".to_string(),
129            ));
130        }
131
132        // Attempt to read the authentication key from the environment variable if not provided
133        let auth_key = self
134            .auth_key
135            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
136
137        // Determine the URLs based on the TLS setting
138        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
139            info!("Using non-secure connection: ws:// and http://");
140            let tycho_ws_url = format!("ws://{}", self.tycho_url);
141            let tycho_rpc_url = format!("http://{}", self.tycho_url);
142            (tycho_ws_url, tycho_rpc_url)
143        } else {
144            info!("Using secure connection: wss:// and https://");
145            let tycho_ws_url = format!("wss://{}", self.tycho_url);
146            let tycho_rpc_url = format!("https://{}", self.tycho_url);
147            (tycho_ws_url, tycho_rpc_url)
148        };
149
150        // Initialize the WebSocket client
151        let ws_client = WsDeltasClient::new(&tycho_ws_url, auth_key.as_deref()).unwrap();
152        let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref()).unwrap();
153        let ws_jh = ws_client
154            .connect()
155            .await
156            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
157
158        // Create and configure the BlockSynchronizer
159        let mut block_sync = BlockSynchronizer::new(
160            Duration::from_secs(self.block_time),
161            Duration::from_secs(self.timeout),
162            self.max_missed_blocks,
163        );
164
165        let available_protocols_set = rpc_client
166            .get_protocol_systems(&ProtocolSystemsRequestBody {
167                chain: self.chain,
168                pagination: PaginationParams { page: 0, page_size: 100 },
169            })
170            .await
171            .unwrap()
172            .protocol_systems
173            .into_iter()
174            .collect::<HashSet<_>>();
175
176        let requested_protocol_set = self
177            .exchanges
178            .keys()
179            .cloned()
180            .collect::<HashSet<_>>();
181
182        let not_requested_protocols = available_protocols_set
183            .difference(&requested_protocol_set)
184            .cloned()
185            .collect::<Vec<_>>();
186
187        if !not_requested_protocols.is_empty() {
188            tracing::info!("Other available protocols: {}", not_requested_protocols.join(", "));
189        }
190
191        // Register each exchange with the BlockSynchronizer
192        for (name, filter) in self.exchanges {
193            info!("Registering exchange: {}", name);
194            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
195            let sync = ProtocolStateSynchronizer::new(
196                id.clone(),
197                true,
198                filter,
199                3,
200                !self.no_state,
201                rpc_client.clone(),
202                ws_client.clone(),
203                self.block_time + self.timeout,
204            );
205            block_sync = block_sync.register_synchronizer(id, sync);
206        }
207
208        // Start the BlockSynchronizer and monitor for disconnections
209        let (sync_jh, rx) = block_sync
210            .run()
211            .await
212            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
213
214        // Monitor WebSocket and BlockSynchronizer futures
215        let handle = tokio::spawn(async move {
216            tokio::select! {
217                res = ws_jh => {
218                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
219                }
220                res = sync_jh => {
221                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
222                }
223            }
224        });
225
226        Ok((handle, rx))
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[tokio::test]
235    async fn test_no_exchanges() {
236        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
237            .auth_key(Some("my_api_key".into()))
238            .build()
239            .await;
240        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
241    }
242
243    #[ignore = "require tycho gateway"]
244    #[tokio::test]
245    async fn teat_simple_build() {
246        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
247        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
248            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
249            .auth_key(Some(token))
250            .build()
251            .await;
252
253        dbg!(&receiver);
254
255        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
256    }
257}