Skip to main content

tycho_simulation/rfq/protocols/hashflow/
client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address, U256};
8use async_trait::async_trait;
9use futures::stream::BoxStream;
10use num_bigint::BigUint;
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use tokio::time::{interval, timeout, Duration};
14use tracing::{error, info, warn};
15use tycho_common::{
16    models::{protocol::GetAmountOutParams, Chain},
17    simulation::indicatively_priced::SignedQuote,
18    Bytes,
19};
20
21use crate::{
22    evm::protocol::u256_num::biguint_to_u256,
23    rfq::{
24        client::RFQClient,
25        errors::RFQError,
26        models::TimestampHeader,
27        protocols::hashflow::models::{
28            HashflowChain, HashflowMarketMakerLevels, HashflowMarketMakersResponse,
29            HashflowPriceLevelsResponse, HashflowQuoteRequest, HashflowQuoteResponse, HashflowRFQ,
30        },
31    },
32    tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
33    tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
34};
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub struct HashflowClient {
38    chain: Chain,
39    price_levels_endpoint: String,
40    market_makers_endpoint: String,
41    quote_endpoint: String,
42    // Tokens that we want prices for
43    tokens: HashSet<Bytes>,
44    // Min tvl value in the quote token.
45    tvl: f64,
46    #[serde(skip_serializing, default)]
47    auth_key: String,
48    #[serde(skip_serializing, default)]
49    auth_user: String,
50    // Quote tokens to normalize to for TVL purposes. Should have the same prices.
51    quote_tokens: HashSet<Bytes>,
52    poll_time: Duration,
53    quote_timeout: Duration,
54}
55
56impl HashflowClient {
57    pub const PROTOCOL_SYSTEM: &'static str = "rfq:hashflow";
58
59    #[allow(clippy::too_many_arguments)]
60    pub fn new(
61        chain: Chain,
62        tokens: HashSet<Bytes>,
63        tvl: f64,
64        quote_tokens: HashSet<Bytes>,
65        auth_user: String,
66        auth_key: String,
67        poll_time: Duration,
68        quote_timeout: Duration,
69    ) -> Result<Self, RFQError> {
70        Ok(Self {
71            chain,
72            price_levels_endpoint: "https://api.hashflow.com/taker/v3/price-levels".to_string(),
73            market_makers_endpoint: "https://api.hashflow.com/taker/v3/market-makers".to_string(),
74            quote_endpoint: "https://api.hashflow.com/taker/v3/rfq".to_string(),
75            tokens,
76            tvl,
77            auth_key,
78            auth_user,
79            quote_tokens,
80            poll_time,
81            quote_timeout,
82        })
83    }
84
85    /// Normalize TVL to a common quote token for comparison
86    /// Returns the normalized TVL value, or 0.0 if normalization fails due to no liquidity
87    fn normalize_tvl(
88        &self,
89        raw_tvl: f64,
90        quote_token: Bytes,
91        levels_by_mm: &HashMap<String, Vec<HashflowMarketMakerLevels>>,
92    ) -> Result<f64, RFQError> {
93        // If the quote token is already in our approved quote token set, no conversion needed
94        if self.quote_tokens.contains(&quote_token) {
95            return Ok(raw_tvl);
96        }
97
98        // Try to find the price of the quote token in one of the approved quote tokens
99        // for normalization.
100        for approved_quote_token in &self.quote_tokens {
101            for (_mm, mm_levels_inner) in levels_by_mm.iter() {
102                for quote_mm_level in mm_levels_inner {
103                    // Check for direct pair: quote_token/approved_quote_token
104                    if quote_mm_level.pair.base_token == quote_token &&
105                        quote_mm_level.pair.quote_token == *approved_quote_token
106                    {
107                        if let Some(price) = quote_mm_level.get_price(1.0) {
108                            return Ok(raw_tvl * price);
109                        }
110                    }
111                }
112            }
113        }
114
115        // If we can't normalize, return TVL 0 (pool will be filtered out)
116        Ok(0.0)
117    }
118
119    fn create_component_with_state(
120        &self,
121        component_id: String,
122        tokens: Vec<Bytes>,
123        mm_name: &str,
124        mm_level: &HashflowMarketMakerLevels,
125        tvl: f64,
126    ) -> ComponentWithState {
127        let protocol_component = ProtocolComponent {
128            id: component_id.clone(),
129            protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
130            protocol_type_name: "hashflow_pool".to_string(),
131            chain: self.chain.into(),
132            tokens,
133            contract_ids: vec![], // empty for RFQ
134            ..Default::default()
135        };
136
137        let mut attributes = HashMap::new();
138
139        // Store price levels as JSON string
140        if !mm_level.levels.is_empty() {
141            let levels_json = serde_json::to_string(&mm_level.levels).unwrap_or_default();
142            attributes.insert("levels".to_string(), levels_json.as_bytes().to_vec().into());
143        }
144        attributes.insert("mm".to_string(), mm_name.as_bytes().to_vec().into());
145
146        ComponentWithState {
147            state: ResponseProtocolState {
148                component_id: component_id.clone(),
149                attributes,
150                balances: HashMap::new(),
151            },
152            component: protocol_component,
153            component_tvl: Some(tvl),
154            entrypoints: vec![],
155        }
156    }
157
158    async fn fetch_market_makers(&mut self) -> Result<Vec<String>, RFQError> {
159        let query_params = vec![
160            ("source", self.auth_user.clone()),
161            ("baseChainType", "evm".to_string()),
162            ("baseChainId", self.chain.id().to_string()),
163        ];
164
165        let http_client = Client::new();
166        let request = http_client
167            .get(&self.market_makers_endpoint)
168            .query(&query_params)
169            .header("accept", "application/json")
170            .header("Authorization", &self.auth_key);
171
172        let response = request.send().await.map_err(|e| {
173            RFQError::ConnectionError(format!("Failed to fetch market makers: {e}"))
174        })?;
175
176        if !response.status().is_success() {
177            return Err(RFQError::ConnectionError(format!(
178                "HTTP error {}: {}",
179                response.status(),
180                response
181                    .text()
182                    .await
183                    .unwrap_or_default()
184            )));
185        }
186
187        let mm_response: HashflowMarketMakersResponse = response.json().await.map_err(|e| {
188            RFQError::ParsingError(format!("Failed to parse market makers response: {e}"))
189        })?;
190
191        info!(
192            "Fetched {} market makers: {:?}",
193            mm_response.market_makers.len(),
194            mm_response.market_makers
195        );
196
197        Ok(mm_response.market_makers)
198    }
199
200    async fn fetch_price_levels(
201        &self,
202        market_makers: &Vec<String>,
203    ) -> Result<HashMap<String, Vec<HashflowMarketMakerLevels>>, RFQError> {
204        let mut query_params = vec![
205            ("source", self.auth_user.clone()),
206            ("baseChainType", "evm".to_string()),
207            ("baseChainId", self.chain.id().to_string()),
208        ];
209
210        // Add market makers as array parameters
211        for mm in market_makers {
212            query_params.push(("marketMakers[]", mm.clone()));
213        }
214
215        let http_client = Client::new();
216        let request = http_client
217            .get(&self.price_levels_endpoint)
218            .query(&query_params)
219            .header("accept", "application/json")
220            .header("Authorization", &self.auth_key);
221
222        let response = request
223            .send()
224            .await
225            .map_err(|e| RFQError::ConnectionError(format!("Failed to fetch price levels: {e}")))?;
226
227        if !response.status().is_success() {
228            return Err(RFQError::ConnectionError(format!(
229                "HTTP error {}: {}",
230                response.status(),
231                response
232                    .text()
233                    .await
234                    .unwrap_or_default()
235            )));
236        }
237
238        let price_response: HashflowPriceLevelsResponse = response.json().await.map_err(|e| {
239            RFQError::ParsingError(format!("Failed to parse price levels response: {e}"))
240        })?;
241
242        if price_response.status != "success" {
243            return Err(RFQError::InvalidInput(format!(
244                "API returned error status: {}",
245                price_response.error.unwrap_or_default()
246            )));
247        }
248
249        price_response
250            .levels
251            .ok_or_else(|| RFQError::ParsingError("API response missing levels".to_string()))
252    }
253}
254
255#[async_trait]
256impl RFQClient for HashflowClient {
257    fn stream(
258        &self,
259    ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
260        let mut client = self.clone();
261
262        Box::pin(async_stream::stream! {
263            let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
264            let mut ticker = interval(client.poll_time);
265
266            info!("Starting Hashflow price levels polling every {} seconds", client.poll_time.as_secs());
267            info!("TVL threshold: {:.2}", client.tvl);
268
269            loop {
270                ticker.tick().await;
271
272                let market_makers;
273                match client.fetch_market_makers().await {
274                    Ok(mms) => {
275                        market_makers = mms;
276                        info!("Successfully fetched market makers");
277                    }
278                    Err(e) => {
279                        info!("Failed to fetch market makers: {}", e);
280                        continue;
281                    }
282                }
283
284                match client.fetch_price_levels(&market_makers).await {
285                    Ok(levels_by_mm) => {
286                        let mut new_components = HashMap::new();
287
288                        info!("Fetched price levels from {} market makers", levels_by_mm.len());
289                        // Process all market maker levels
290                        for (mm_name, mm_levels) in levels_by_mm.iter() {
291                            for mm_level in mm_levels {
292                                let base_token = &mm_level.pair.base_token;
293                                let quote_token = &mm_level.pair.quote_token;
294
295                                // Check if both tokens are in our tokens set
296                                if client.tokens.contains(base_token) && client.tokens.contains(quote_token) {
297                                    let tokens = vec![base_token.clone(), quote_token.clone()];
298                                    let tvl = mm_level.calculate_tvl();
299
300                                    // Apply TVL normalization if needed
301                                    let normalized_tvl = client.normalize_tvl(
302                                        tvl,
303                                        mm_level.pair.quote_token.clone(),
304                                        &levels_by_mm,
305                                    )?;
306
307                                    // Hash the pair for component id
308                                    let pair_str = format!("hashflow_{}/{}", hex::encode(base_token), hex::encode(quote_token));
309                                    let component_id = format!("{}", keccak256(pair_str.as_bytes()));
310
311                                    if normalized_tvl < client.tvl {
312                                        info!("Filtering out component {} due to low TVL: {:.2} < {:.2}",
313                                              component_id, normalized_tvl, client.tvl);
314                                        continue;
315                                    }
316
317                                    let component_with_state = client.create_component_with_state(
318                                        component_id.clone(),
319                                        tokens,
320                                        mm_name,
321                                        mm_level,
322                                        normalized_tvl
323                                    );
324                                    new_components.insert(component_id, component_with_state);
325                                }
326                            }
327                        }
328
329                        // Find components that were removed
330                        let removed_components: HashMap<String, ProtocolComponent> = current_components
331                            .iter()
332                            .filter(|&(id, _)| !new_components.contains_key(id))
333                            .map(|(k, v)| (k.clone(), v.component.clone()))
334                            .collect();
335
336                        // Update current state
337                        current_components = new_components.clone();
338
339                        let snapshot = Snapshot {
340                            states: new_components,
341                            vm_storage: HashMap::new(),
342                        };
343                        let timestamp = SystemTime::now().duration_since(
344                            SystemTime::UNIX_EPOCH
345                        ).map_err(
346                            |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
347                        )?.as_secs();
348
349                        let msg = StateSyncMessage::<TimestampHeader> {
350                            header: TimestampHeader { timestamp },
351                            snapshots: snapshot,
352                            deltas: None,
353                            removed_components,
354                        };
355
356                        yield Ok(("hashflow".to_string(), msg));
357                    },
358                    Err(e) => {
359                        error!("Failed to fetch price levels from Hashflow API: {}", e);
360                        continue;
361                    }
362                }
363            }
364        })
365    }
366
367    async fn request_binding_quote(
368        &self,
369        params: &GetAmountOutParams,
370    ) -> Result<SignedQuote, RFQError> {
371        let hashflow_chain = HashflowChain::from(self.chain);
372        let quote_request = HashflowQuoteRequest {
373            source: self.auth_user.clone(),
374            base_chain: hashflow_chain.clone(),
375            quote_chain: hashflow_chain,
376            rfqs: vec![HashflowRFQ {
377                base_token: params.token_in.to_string(),
378                quote_token: params.token_out.to_string(),
379                base_token_amount: Some(params.amount_in.to_string()),
380                quote_token_amount: None,
381                trader: params.receiver.to_string(),
382                effective_trader: None,
383            }],
384            calldata: false,
385        };
386
387        let url = self.quote_endpoint.clone();
388
389        let start_time = std::time::Instant::now();
390        const MAX_RETRIES: u32 = 3;
391        let mut last_error = None;
392
393        for attempt in 0..MAX_RETRIES {
394            // Check if we have time remaining for this attempt
395            let elapsed = start_time.elapsed();
396            if elapsed >= self.quote_timeout {
397                return Err(last_error.unwrap_or_else(|| {
398                    RFQError::ConnectionError(format!(
399                        "Hashflow quote request timed out after {} seconds",
400                        self.quote_timeout.as_secs()
401                    ))
402                }));
403            }
404
405            let remaining_time = self.quote_timeout - elapsed;
406
407            let http_client = Client::new();
408            let request = http_client
409                .post(&url)
410                .json(&quote_request)
411                .header("accept", "application/json")
412                .header("Authorization", &self.auth_key);
413
414            let response = match timeout(remaining_time, request.send()).await {
415                Ok(Ok(resp)) => resp,
416                Ok(Err(e)) => {
417                    warn!(
418                        "Hashflow quote request failed (attempt {}/{}): {}",
419                        attempt + 1,
420                        MAX_RETRIES,
421                        e
422                    );
423                    last_error = Some(RFQError::ConnectionError(format!(
424                        "Failed to send Hashflow quote request: {e}"
425                    )));
426                    if attempt < MAX_RETRIES - 1 {
427                        tokio::time::sleep(Duration::from_millis(100)).await;
428                        continue;
429                    } else {
430                        return Err(last_error.unwrap());
431                    }
432                }
433                Err(_) => {
434                    return Err(RFQError::ConnectionError(format!(
435                        "Hashflow quote request timed out after {} seconds",
436                        self.quote_timeout.as_secs()
437                    )));
438                }
439            };
440
441            if response.status() != 200 {
442                let err_msg = match response.text().await {
443                    Ok(text) => text,
444                    Err(e) => {
445                        warn!(
446                            "Hashflow error response parsing failed (attempt {}/{}): {}",
447                            attempt + 1,
448                            MAX_RETRIES,
449                            e
450                        );
451                        last_error = Some(RFQError::ParsingError(format!(
452                            "Failed to read response text from Hashflow failed request: {e}"
453                        )));
454                        if attempt < MAX_RETRIES - 1 {
455                            tokio::time::sleep(Duration::from_millis(100)).await;
456                            continue;
457                        } else {
458                            return Err(last_error.unwrap());
459                        }
460                    }
461                };
462                last_error = Some(RFQError::FatalError(format!(
463                    "Failed to send Hashflow quote request: {err_msg}",
464                )));
465                if attempt < MAX_RETRIES - 1 {
466                    warn!(
467                        "Hashflow returned non-200 status (attempt {}/{}): {}",
468                        attempt + 1,
469                        MAX_RETRIES,
470                        err_msg
471                    );
472                    tokio::time::sleep(Duration::from_millis(100)).await;
473                    continue;
474                } else {
475                    return Err(last_error.unwrap());
476                }
477            }
478
479            let quote_response = match response
480                .json::<HashflowQuoteResponse>()
481                .await
482            {
483                Ok(resp) => resp,
484                Err(e) => {
485                    warn!(
486                        "Hashflow quote response parsing failed (attempt {}/{}): {}",
487                        attempt + 1,
488                        MAX_RETRIES,
489                        e
490                    );
491                    last_error = Some(RFQError::ParsingError(format!(
492                        "Failed to parse Hashflow quote response: {e}"
493                    )));
494                    if attempt < MAX_RETRIES - 1 {
495                        tokio::time::sleep(Duration::from_millis(100)).await;
496                        continue;
497                    } else {
498                        return Err(last_error.unwrap());
499                    }
500                }
501            };
502
503            match quote_response.status.as_str() {
504                "success" => {
505                    if let Some(quotes) = quote_response.quotes {
506                        if quotes.is_empty() {
507                            return Err(RFQError::QuoteNotFound(format!(
508                                "Hashflow quote not found for {} {} ->{}",
509                                params.amount_in, params.token_in, params.token_out,
510                            )));
511                        }
512                        // We assume there will be only one quote request at a time
513                        let quote = quotes[0].clone();
514                        quote.validate(params)?;
515
516                        let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
517                        quote_attributes.insert("pool".to_string(), quote.quote_data.pool);
518                        if let Some(external_account) = quote.quote_data.external_account {
519                            quote_attributes
520                                .insert("external_account".to_string(), external_account);
521                        } else {
522                            quote_attributes.insert(
523                                "external_account".to_string(),
524                                Bytes::from_str(&Address::ZERO.to_string()).map_err(|_| {
525                                    RFQError::ParsingError(
526                                        "Failed to parse zero address".to_string(),
527                                    )
528                                })?,
529                            );
530                        }
531                        quote_attributes.insert("trader".to_string(), quote.quote_data.trader);
532                        quote_attributes
533                            .insert("base_token".to_string(), quote.quote_data.base_token);
534                        quote_attributes
535                            .insert("quote_token".to_string(), quote.quote_data.quote_token);
536                        quote_attributes.insert(
537                            "base_token_amount".to_string(),
538                            Bytes::from(
539                                biguint_to_u256(
540                                    &BigUint::from_str(&quote.quote_data.base_token_amount)
541                                        .map_err(|_| {
542                                            RFQError::ParsingError(format!(
543                                                "Failed to parse base token amount: {}",
544                                                quote.quote_data.base_token_amount
545                                            ))
546                                        })?,
547                                )
548                                .to_be_bytes::<32>()
549                                .to_vec(),
550                            ),
551                        );
552                        quote_attributes.insert(
553                            "quote_token_amount".to_string(),
554                            Bytes::from(
555                                biguint_to_u256(
556                                    &BigUint::from_str(&quote.quote_data.quote_token_amount)
557                                        .map_err(|_| {
558                                            RFQError::ParsingError(format!(
559                                                "Failed to parse quote token amount: {}",
560                                                quote.quote_data.quote_token_amount
561                                            ))
562                                        })?,
563                                )
564                                .to_be_bytes::<32>()
565                                .to_vec(),
566                            ),
567                        );
568                        quote_attributes.insert(
569                            "quote_expiry".to_string(),
570                            Bytes::from(
571                                U256::from(quote.quote_data.quote_expiry)
572                                    .to_be_bytes::<32>()
573                                    .to_vec(),
574                            ),
575                        );
576                        quote_attributes.insert(
577                            "nonce".to_string(),
578                            Bytes::from(
579                                U256::from(quote.quote_data.nonce)
580                                    .to_be_bytes::<32>()
581                                    .to_vec(),
582                            ),
583                        );
584                        quote_attributes.insert("tx_id".to_string(), quote.quote_data.tx_id);
585                        quote_attributes.insert("signature".to_string(), quote.signature);
586
587                        let signed_quote = SignedQuote {
588                            base_token: params.token_in.clone(),
589                            quote_token: params.token_out.clone(),
590                            amount_in: BigUint::from_str(&quote.quote_data.base_token_amount)
591                                .map_err(|_| {
592                                    RFQError::ParsingError(format!(
593                                        "Failed to parse amount in string: {}",
594                                        quote.quote_data.base_token_amount
595                                    ))
596                                })?,
597                            amount_out: BigUint::from_str(&quote.quote_data.quote_token_amount)
598                                .map_err(|_| {
599                                    RFQError::ParsingError(format!(
600                                        "Failed to parse amount out string: {}",
601                                        quote.quote_data.quote_token_amount
602                                    ))
603                                })?,
604                            quote_attributes,
605                        };
606                        return Ok(signed_quote);
607                    } else {
608                        return Err(RFQError::QuoteNotFound(format!(
609                            "Hashflow quote not found for {} {} ->{}",
610                            params.amount_in, params.token_in, params.token_out,
611                        )));
612                    }
613                }
614                "fail" => {
615                    return Err(RFQError::FatalError(format!(
616                        "Hashflow API error: {:?}",
617                        quote_response.error
618                    )));
619                }
620                _ => {
621                    return Err(RFQError::FatalError(
622                        "Hashflow API error: Unknown status".to_string(),
623                    ));
624                }
625            }
626        }
627
628        Err(last_error.unwrap_or_else(|| {
629            RFQError::ConnectionError("Hashflow quote request failed after retries".to_string())
630        }))
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use std::{env, str::FromStr, time::Duration};
637
638    use dotenv::dotenv;
639    use futures::StreamExt;
640    use tokio::time::timeout;
641
642    use super::*;
643    use crate::rfq::{
644        constants::get_hashflow_auth,
645        protocols::hashflow::models::{HashflowPair, HashflowPriceLevel},
646    };
647
648    #[test]
649    fn test_normalize_tvl_same_quote_token() {
650        let client = create_test_client();
651        let levels = HashMap::new();
652
653        // USDC is in our quote tokens, so no normalization should happen
654        let result = client.normalize_tvl(
655            1000.0,
656            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
657            &levels,
658        );
659        assert!(result.is_ok());
660        assert_eq!(result.unwrap(), 1000.0);
661    }
662
663    #[test]
664    fn test_normalize_tvl_different_quote_token() {
665        let client = create_test_client();
666        let mut levels = HashMap::new();
667        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
668        let usdc = Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
669
670        // Create mock levels for ETH/USDC pair for normalization
671        let eth_usdc_level = HashflowMarketMakerLevels {
672            pair: HashflowPair { base_token: weth.clone(), quote_token: usdc },
673            levels: vec![
674                HashflowPriceLevel { quantity: 1.0, price: 3000.0 }, /* 1 ETH = 3000 USDC */
675            ],
676        };
677
678        levels.insert("test_mm".to_string(), vec![eth_usdc_level]);
679
680        // Test normalizing ETH TVL to USDC
681        let result = client.normalize_tvl(2.0, weth, &levels);
682        assert!(result.is_ok());
683        // 2 ETH * 3000 USDC/ETH = 6000 USDC
684        assert_eq!(result.unwrap(), 6000.0);
685    }
686
687    #[test]
688    fn test_normalize_tvl_no_conversion_available() {
689        let client = create_test_client();
690        let levels = HashMap::new();
691        let result = client.normalize_tvl(
692            1000.0,
693            Bytes::from_str("0x1234567890123456789012345678901234567890").unwrap(),
694            &levels,
695        );
696        assert!(result.is_ok());
697        assert_eq!(result.unwrap(), 0.0);
698    }
699
700    fn create_test_client() -> HashflowClient {
701        let quote_tokens = HashSet::from([
702            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), // USDC
703            Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(), // USDT
704        ]);
705
706        HashflowClient::new(
707            Chain::Ethereum,
708            HashSet::new(),
709            1.0,
710            quote_tokens,
711            "test_user".to_string(),
712            "test_key".to_string(),
713            Duration::from_secs(5),
714            Duration::from_secs(5),
715        )
716        .unwrap()
717    }
718
719    #[tokio::test]
720    #[ignore] // Requires network access and HASHFLOW_KEY environment variable
721    async fn test_hashflow_api_polling() {
722        dotenv().expect("Missing .env file");
723        let auth = get_hashflow_auth().unwrap();
724
725        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
726        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
727
728        let tokens = HashSet::from([wbtc, weth.clone()]);
729
730        let quote_tokens = HashSet::from([
731            Bytes::from_str("0xa0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), // USDC
732            Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), // USDT
733        ]);
734
735        let client = HashflowClient::new(
736            Chain::Ethereum,
737            tokens,
738            1.0, // $1 minimum TVL - very low to capture most pairs
739            quote_tokens,
740            auth.user,
741            auth.key,
742            Duration::from_secs(1),
743            Duration::from_secs(5),
744        )
745        .unwrap();
746
747        let mut stream = client.stream();
748
749        let result = timeout(Duration::from_secs(10), async {
750            let mut message_count = 0;
751            let max_messages = 3;
752            let mut total_components_received = 0;
753
754            while let Some(result) = stream.next().await {
755                match result {
756                    Ok((component_id, msg)) => {
757                        println!("Received message with ID: {component_id}");
758
759                        assert!(!component_id.is_empty());
760                        assert_eq!(component_id, "hashflow");
761                        assert!(msg.header.timestamp > 0);
762
763                        let snapshot = &msg.snapshots;
764                        total_components_received += snapshot.states.len();
765
766                        println!("Received {} components in this message (Total so far: {})",
767                                snapshot.states.len(), total_components_received);
768
769                        for (id, component_with_state) in &snapshot.states {
770                            let attributes = &component_with_state.state.attributes;
771                            let levels: &Bytes = attributes.get("levels").unwrap();
772                            // Check that levels exist
773                            if attributes.contains_key("levels") {
774                                println!("{levels:?}");
775                                assert!(!attributes["levels"].is_empty());
776                            }
777                            // Check that mm name exist
778                            if attributes.contains_key("mm") {
779                                assert!(!attributes["mm"].is_empty());
780                            }
781
782                            if let Some(tvl) = component_with_state.component_tvl {
783                                assert!(tvl >= 1.0);
784                                println!("Component {id} TVL: ${tvl:.2}");
785                            }
786                        }
787
788                        message_count += 1;
789                        if message_count >= max_messages {
790                            break;
791                        }
792                    }
793                    Err(e) => {
794                        panic!("Stream error: {e}");
795                    }
796                }
797            }
798
799            assert!(message_count > 0, "Should have received at least one message");
800            assert!(total_components_received >= 1, "Should have received at least 1 component with $1 TVL threshold");
801            println!("Successfully received {message_count} messages with {total_components_received} total components");
802        })
803        .await;
804
805        match result {
806            Ok(_) => println!("Test completed successfully"),
807            Err(_) => panic!("Test timed out - no messages received within 5 seconds"),
808        }
809    }
810
811    #[tokio::test]
812    #[ignore] // Requires network access and setting proper env vars
813    async fn test_request_binding_quote() {
814        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
815        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
816
817        let auth_user = String::from("propellerheads");
818        dotenv().expect("Missing .env file");
819        let auth_key = env::var("HASHFLOW_KEY").unwrap();
820
821        let client = HashflowClient::new(
822            Chain::Ethereum,
823            HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
824            10.0,
825            HashSet::new(),
826            auth_user,
827            auth_key,
828            Duration::from_secs(0),
829            Duration::from_secs(5),
830        )
831        .unwrap();
832
833        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
834
835        let params = GetAmountOutParams {
836            amount_in: BigUint::from(1_000000000000000000u64),
837            token_in: weth.clone(),
838            token_out: wbtc.clone(),
839            sender: router.clone(),
840            receiver: router.clone(),
841        };
842        let quote = client
843            .request_binding_quote(&params)
844            .await
845            .unwrap();
846
847        assert_eq!(quote.base_token, weth);
848        assert_eq!(quote.quote_token, wbtc);
849        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
850
851        // // Assuming the BTC - WETH price doesn't change too much at the time of running this
852        assert!(quote.amount_out > BigUint::from(3000000u64));
853
854        assert_eq!(quote.quote_attributes.len(), 11);
855        let expected_attributes = [
856            "pool",
857            "external_account",
858            "trader",
859            "base_token",
860            "quote_token",
861            "base_token_amount",
862            "quote_token_amount",
863            "quote_expiry",
864            "nonce",
865            "tx_id",
866            "signature",
867        ];
868        for attr in expected_attributes {
869            assert!(
870                quote
871                    .quote_attributes
872                    .contains_key(attr),
873                "Missing attribute: {attr}"
874            );
875        }
876        assert_eq!(
877            quote
878                .quote_attributes
879                .get("trader")
880                .unwrap(),
881            &router
882        );
883    }
884
885    /// Helper function to create a mock server that responds after a delay
886    async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
887        use tokio::{io::AsyncWriteExt, net::TcpListener};
888
889        let listener = TcpListener::bind("127.0.0.1:0")
890            .await
891            .unwrap();
892        let addr = listener.local_addr().unwrap();
893
894        let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
895
896        tokio::spawn(async move {
897            while let Ok((mut stream, _)) = listener.accept().await {
898                let json_response_clone = json_response.to_owned();
899                tokio::spawn(async move {
900                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
901                    let response = format!(
902                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
903                        json_response_clone.len(),
904                        json_response_clone
905                    );
906                    let _ = stream
907                        .write_all(response.as_bytes())
908                        .await;
909                    let _ = stream.flush().await;
910                    let _ = stream.shutdown().await;
911                });
912            }
913        });
914
915        tokio::time::sleep(Duration::from_millis(50)).await;
916        addr
917    }
918
919    fn create_test_hashflow_client(
920        quote_endpoint: String,
921        quote_timeout: Duration,
922    ) -> HashflowClient {
923        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
924        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
925
926        HashflowClient {
927            chain: Chain::Ethereum,
928            price_levels_endpoint: "http://unused/price-levels".to_string(),
929            market_makers_endpoint: "http://unused/market-makers".to_string(),
930            quote_endpoint,
931            tokens: HashSet::from([token_in, token_out]),
932            tvl: 10.0,
933            auth_key: "test_key".to_string(),
934            auth_user: "test_user".to_string(),
935            quote_tokens: HashSet::new(),
936            poll_time: Duration::from_secs(0),
937            quote_timeout,
938        }
939    }
940
941    /// Helper function to create test quote params
942    fn create_test_quote_params() -> GetAmountOutParams {
943        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
944        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
945        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
946
947        GetAmountOutParams {
948            amount_in: BigUint::from(1_000000000000000000u64),
949            token_in,
950            token_out,
951            sender: router.clone(),
952            receiver: router,
953        }
954    }
955
956    #[tokio::test]
957    async fn test_hashflow_quote_timeout() {
958        let addr = create_delayed_response_server(500).await;
959
960        // Test 1: Client with short timeout (200ms) - should timeout
961        let client_short_timeout = create_test_hashflow_client(
962            format!("http://127.0.0.1:{}/rfq", addr.port()),
963            Duration::from_millis(200),
964        );
965        let params = create_test_quote_params();
966
967        // This should timeout after 200ms
968        let start = std::time::Instant::now();
969        let result = client_short_timeout
970            .request_binding_quote(&params)
971            .await;
972        let elapsed = start.elapsed();
973
974        // Verify that we got a timeout error
975        assert!(result.is_err());
976        let err = result.unwrap_err();
977        match err {
978            RFQError::ConnectionError(msg) => {
979                assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
980            }
981            _ => panic!("Expected ConnectionError, got: {:?}", err),
982        }
983        // Should have timed out around 200ms, definitely less than 400ms
984        assert!(
985            elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
986            "Expected timeout around 200ms, got: {:?}",
987            elapsed
988        );
989
990        // Test 2: Client with long timeout (1 second) - should wait and receive response
991        // Note: With retry logic, we may need multiple attempts if the response is malformed,
992        // so we need a longer timeout to account for retries
993        let client_long_timeout = create_test_hashflow_client(
994            format!("http://127.0.0.1:{}/rfq", addr.port()),
995            Duration::from_secs(1),
996        );
997
998        // This should wait for the response (500ms)
999        let result = client_long_timeout
1000            .request_binding_quote(&params)
1001            .await;
1002
1003        // Should succeed - the server waits 500ms which is within the 1s timeout
1004        assert!(result.is_ok(), "Expected success, got: {:?}", result);
1005    }
1006
1007    /// Helper function to create a mock server that fails twice, then succeeds
1008    async fn create_retry_server() -> (std::net::SocketAddr, std::sync::Arc<std::sync::Mutex<u32>>)
1009    {
1010        use std::sync::{Arc, Mutex};
1011
1012        use tokio::{io::AsyncWriteExt, net::TcpListener};
1013
1014        let request_count = Arc::new(Mutex::new(0u32));
1015        let request_count_clone = request_count.clone();
1016
1017        let listener = TcpListener::bind("127.0.0.1:0")
1018            .await
1019            .unwrap();
1020        let addr = listener.local_addr().unwrap();
1021
1022        let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
1023
1024        tokio::spawn(async move {
1025            while let Ok((mut stream, _)) = listener.accept().await {
1026                let count_clone = request_count_clone.clone();
1027                let json_response_clone = json_response.to_owned();
1028                tokio::spawn(async move {
1029                    *count_clone.lock().unwrap() += 1;
1030                    let count = *count_clone.lock().unwrap();
1031                    println!("Mock server: Received request #{count}");
1032
1033                    if count <= 2 {
1034                        let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1035                        let _ = stream
1036                            .write_all(response.as_bytes())
1037                            .await;
1038                    } else {
1039                        let response = format!(
1040                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1041                            json_response_clone.len(),
1042                            json_response_clone
1043                        );
1044                        let _ = stream
1045                            .write_all(response.as_bytes())
1046                            .await;
1047                    }
1048                    let _ = stream.flush().await;
1049                    let _ = stream.shutdown().await;
1050                });
1051            }
1052        });
1053
1054        tokio::time::sleep(Duration::from_millis(50)).await;
1055        (addr, request_count)
1056    }
1057
1058    #[tokio::test]
1059    async fn test_hashflow_quote_retry_on_bad_response() {
1060        let (addr, request_count) = create_retry_server().await;
1061
1062        let client = create_test_hashflow_client(
1063            format!("http://127.0.0.1:{}/rfq", addr.port()),
1064            Duration::from_secs(5),
1065        );
1066        let params = create_test_quote_params();
1067        let result = client
1068            .request_binding_quote(&params)
1069            .await;
1070
1071        assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1072        let quote = result.unwrap();
1073
1074        // Verify the quote is parsed as expected
1075        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
1076        assert_eq!(quote.amount_out, BigUint::from(3329502u64));
1077
1078        // Verify exactly 3 requests were made (2 failures + 1 success)
1079        let final_count = *request_count.lock().unwrap();
1080        assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1081    }
1082
1083    #[test]
1084    fn test_hashflow_client_serialize_deserialize_roundtrip() {
1085        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1086        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1087        let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1088
1089        let original = HashflowClient {
1090            chain: Chain::Ethereum,
1091            price_levels_endpoint: "https://api.hashflow.com/price_levels".to_string(),
1092            market_makers_endpoint: "https://api.hashflow.com/market_makers".to_string(),
1093            quote_endpoint: "https://api.hashflow.com/quote".to_string(),
1094            tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1095            tvl: 50.5,
1096            auth_key: "secret_key".to_string(),
1097            auth_user: "secret_user".to_string(),
1098            quote_tokens: HashSet::from([quote_token.clone()]),
1099            poll_time: Duration::from_secs(10),
1100            quote_timeout: Duration::from_millis(5500),
1101        };
1102
1103        let serialized = serde_json::to_string(&original).unwrap();
1104        let deserialized: HashflowClient = serde_json::from_str(&serialized).unwrap();
1105
1106        // Fields that should round-trip correctly
1107        assert_eq!(deserialized.chain, original.chain);
1108        assert_eq!(deserialized.price_levels_endpoint, original.price_levels_endpoint);
1109        assert_eq!(deserialized.market_makers_endpoint, original.market_makers_endpoint);
1110        assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1111        assert_eq!(deserialized.tokens, original.tokens);
1112        assert_eq!(deserialized.tvl, original.tvl);
1113        assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1114        assert_eq!(deserialized.poll_time, original.poll_time);
1115        assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1116
1117        // auth_key and auth_user should NOT round-trip (skip_serializing + default)
1118        assert_eq!(deserialized.auth_key, "");
1119        assert_eq!(deserialized.auth_user, "");
1120        assert_ne!(deserialized.auth_key, original.auth_key);
1121        assert_ne!(deserialized.auth_user, original.auth_user);
1122    }
1123
1124    #[test]
1125    fn test_hashflow_client_deserialize_with_credentials() {
1126        // When auth_key and auth_user are provided in JSON, they should be deserialized
1127        // (skip_serializing only affects serialization, not deserialization)
1128        let json = r#"{
1129            "chain": "ethereum",
1130            "price_levels_endpoint": "https://api.hashflow.com/price_levels",
1131            "market_makers_endpoint": "https://api.hashflow.com/market_makers",
1132            "quote_endpoint": "https://api.hashflow.com/quote",
1133            "tokens": [],
1134            "tvl": 10.0,
1135            "auth_key": "provided_key",
1136            "auth_user": "provided_user",
1137            "quote_tokens": [],
1138            "poll_time": {"secs": 10, "nanos": 0},
1139            "quote_timeout": {"secs": 30, "nanos": 0}
1140        }"#;
1141
1142        let client: HashflowClient = serde_json::from_str(json).unwrap();
1143
1144        // Credentials should be deserialized from JSON
1145        assert_eq!(client.auth_key, "provided_key");
1146        assert_eq!(client.auth_user, "provided_user");
1147    }
1148}