tycho_simulation/rfq/protocols/hashflow/
client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address, U256};
8use async_trait::async_trait;
9use futures::stream::BoxStream;
10use num_bigint::BigUint;
11use reqwest::Client;
12use tokio::time::{interval, timeout, Duration};
13use tracing::{error, info, warn};
14use tycho_common::{
15    models::{protocol::GetAmountOutParams, Chain},
16    simulation::indicatively_priced::SignedQuote,
17    Bytes,
18};
19
20use crate::{
21    evm::protocol::u256_num::biguint_to_u256,
22    rfq::{
23        client::RFQClient,
24        errors::RFQError,
25        models::TimestampHeader,
26        protocols::hashflow::models::{
27            HashflowChain, HashflowMarketMakerLevels, HashflowMarketMakersResponse,
28            HashflowPriceLevelsResponse, HashflowQuoteRequest, HashflowQuoteResponse, HashflowRFQ,
29        },
30    },
31    tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
32    tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
33};
34
35#[derive(Clone, Debug)]
36pub struct HashflowClient {
37    chain: Chain,
38    price_levels_endpoint: String,
39    market_makers_endpoint: String,
40    quote_endpoint: String,
41    // Tokens that we want prices for
42    tokens: HashSet<Bytes>,
43    // Min tvl value in the quote token.
44    tvl: f64,
45    auth_key: String,
46    auth_user: String,
47    // Quote tokens to normalize to for TVL purposes. Should have the same prices.
48    quote_tokens: HashSet<Bytes>,
49    poll_time: Duration,
50    quote_timeout: Duration,
51}
52
53impl HashflowClient {
54    pub const PROTOCOL_SYSTEM: &'static str = "rfq:hashflow";
55
56    #[allow(clippy::too_many_arguments)]
57    pub fn new(
58        chain: Chain,
59        tokens: HashSet<Bytes>,
60        tvl: f64,
61        quote_tokens: HashSet<Bytes>,
62        auth_user: String,
63        auth_key: String,
64        poll_time: Duration,
65        quote_timeout: Duration,
66    ) -> Result<Self, RFQError> {
67        Ok(Self {
68            chain,
69            price_levels_endpoint: "https://api.hashflow.com/taker/v3/price-levels".to_string(),
70            market_makers_endpoint: "https://api.hashflow.com/taker/v3/market-makers".to_string(),
71            quote_endpoint: "https://api.hashflow.com/taker/v3/rfq".to_string(),
72            tokens,
73            tvl,
74            auth_key,
75            auth_user,
76            quote_tokens,
77            poll_time,
78            quote_timeout,
79        })
80    }
81
82    /// Normalize TVL to a common quote token for comparison
83    /// Returns the normalized TVL value, or 0.0 if normalization fails due to no liquidity
84    fn normalize_tvl(
85        &self,
86        raw_tvl: f64,
87        quote_token: Bytes,
88        levels_by_mm: &HashMap<String, Vec<HashflowMarketMakerLevels>>,
89    ) -> Result<f64, RFQError> {
90        // If the quote token is already in our approved quote token set, no conversion needed
91        if self.quote_tokens.contains(&quote_token) {
92            return Ok(raw_tvl);
93        }
94
95        // Try to find the price of the quote token in one of the approved quote tokens
96        // for normalization.
97        for approved_quote_token in &self.quote_tokens {
98            for (_mm, mm_levels_inner) in levels_by_mm.iter() {
99                for quote_mm_level in mm_levels_inner {
100                    // Check for direct pair: quote_token/approved_quote_token
101                    if quote_mm_level.pair.base_token == quote_token &&
102                        quote_mm_level.pair.quote_token == *approved_quote_token
103                    {
104                        if let Some(price) = quote_mm_level.get_price(1.0) {
105                            return Ok(raw_tvl * price);
106                        }
107                    }
108                }
109            }
110        }
111
112        // If we can't normalize, return TVL 0 (pool will be filtered out)
113        Ok(0.0)
114    }
115
116    fn create_component_with_state(
117        &self,
118        component_id: String,
119        tokens: Vec<Bytes>,
120        mm_name: &str,
121        mm_level: &HashflowMarketMakerLevels,
122        tvl: f64,
123    ) -> ComponentWithState {
124        let protocol_component = ProtocolComponent {
125            id: component_id.clone(),
126            protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
127            protocol_type_name: "hashflow_pool".to_string(),
128            chain: self.chain.into(),
129            tokens,
130            contract_ids: vec![], // empty for RFQ
131            ..Default::default()
132        };
133
134        let mut attributes = HashMap::new();
135
136        // Store price levels as JSON string
137        if !mm_level.levels.is_empty() {
138            let levels_json = serde_json::to_string(&mm_level.levels).unwrap_or_default();
139            attributes.insert("levels".to_string(), levels_json.as_bytes().to_vec().into());
140        }
141        attributes.insert("mm".to_string(), mm_name.as_bytes().to_vec().into());
142
143        ComponentWithState {
144            state: ResponseProtocolState {
145                component_id: component_id.clone(),
146                attributes,
147                balances: HashMap::new(),
148            },
149            component: protocol_component,
150            component_tvl: Some(tvl),
151            entrypoints: vec![],
152        }
153    }
154
155    async fn fetch_market_makers(&mut self) -> Result<Vec<String>, RFQError> {
156        let query_params = vec![
157            ("source", self.auth_user.clone()),
158            ("baseChainType", "evm".to_string()),
159            ("baseChainId", self.chain.id().to_string()),
160        ];
161
162        let http_client = Client::new();
163        let request = http_client
164            .get(&self.market_makers_endpoint)
165            .query(&query_params)
166            .header("accept", "application/json")
167            .header("Authorization", &self.auth_key);
168
169        let response = request.send().await.map_err(|e| {
170            RFQError::ConnectionError(format!("Failed to fetch market makers: {e}"))
171        })?;
172
173        if !response.status().is_success() {
174            return Err(RFQError::ConnectionError(format!(
175                "HTTP error {}: {}",
176                response.status(),
177                response
178                    .text()
179                    .await
180                    .unwrap_or_default()
181            )));
182        }
183
184        let mm_response: HashflowMarketMakersResponse = response.json().await.map_err(|e| {
185            RFQError::ParsingError(format!("Failed to parse market makers response: {e}"))
186        })?;
187
188        info!(
189            "Fetched {} market makers: {:?}",
190            mm_response.market_makers.len(),
191            mm_response.market_makers
192        );
193
194        Ok(mm_response.market_makers)
195    }
196
197    async fn fetch_price_levels(
198        &self,
199        market_makers: &Vec<String>,
200    ) -> Result<HashMap<String, Vec<HashflowMarketMakerLevels>>, RFQError> {
201        let mut query_params = vec![
202            ("source", self.auth_user.clone()),
203            ("baseChainType", "evm".to_string()),
204            ("baseChainId", self.chain.id().to_string()),
205        ];
206
207        // Add market makers as array parameters
208        for mm in market_makers {
209            query_params.push(("marketMakers[]", mm.clone()));
210        }
211
212        let http_client = Client::new();
213        let request = http_client
214            .get(&self.price_levels_endpoint)
215            .query(&query_params)
216            .header("accept", "application/json")
217            .header("Authorization", &self.auth_key);
218
219        let response = request
220            .send()
221            .await
222            .map_err(|e| RFQError::ConnectionError(format!("Failed to fetch price levels: {e}")))?;
223
224        if !response.status().is_success() {
225            return Err(RFQError::ConnectionError(format!(
226                "HTTP error {}: {}",
227                response.status(),
228                response
229                    .text()
230                    .await
231                    .unwrap_or_default()
232            )));
233        }
234
235        let price_response: HashflowPriceLevelsResponse = response.json().await.map_err(|e| {
236            RFQError::ParsingError(format!("Failed to parse price levels response: {e}"))
237        })?;
238
239        if price_response.status != "success" {
240            return Err(RFQError::InvalidInput(format!(
241                "API returned error status: {}",
242                price_response.error.unwrap_or_default()
243            )));
244        }
245
246        price_response
247            .levels
248            .ok_or_else(|| RFQError::ParsingError("API response missing levels".to_string()))
249    }
250}
251
252#[async_trait]
253impl RFQClient for HashflowClient {
254    fn stream(
255        &self,
256    ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
257        let mut client = self.clone();
258
259        Box::pin(async_stream::stream! {
260            let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
261            let mut ticker = interval(client.poll_time);
262
263            info!("Starting Hashflow price levels polling every {} seconds", client.poll_time.as_secs());
264            info!("TVL threshold: {:.2}", client.tvl);
265
266            loop {
267                ticker.tick().await;
268
269                let market_makers;
270                match client.fetch_market_makers().await {
271                    Ok(mms) => {
272                        market_makers = mms;
273                        info!("Successfully fetched market makers");
274                    }
275                    Err(e) => {
276                        info!("Failed to fetch market makers: {}", e);
277                        continue;
278                    }
279                }
280
281                match client.fetch_price_levels(&market_makers).await {
282                    Ok(levels_by_mm) => {
283                        let mut new_components = HashMap::new();
284
285                        info!("Fetched price levels from {} market makers", levels_by_mm.len());
286                        // Process all market maker levels
287                        for (mm_name, mm_levels) in levels_by_mm.iter() {
288                            for mm_level in mm_levels {
289                                let base_token = &mm_level.pair.base_token;
290                                let quote_token = &mm_level.pair.quote_token;
291
292                                // Check if both tokens are in our tokens set
293                                if client.tokens.contains(base_token) && client.tokens.contains(quote_token) {
294                                    let tokens = vec![base_token.clone(), quote_token.clone()];
295                                    let tvl = mm_level.calculate_tvl();
296
297                                    // Apply TVL normalization if needed
298                                    let normalized_tvl = client.normalize_tvl(
299                                        tvl,
300                                        mm_level.pair.quote_token.clone(),
301                                        &levels_by_mm,
302                                    )?;
303
304                                    // Hash the pair for component id
305                                    let pair_str = format!("hashflow_{}/{}", hex::encode(base_token), hex::encode(quote_token));
306                                    let component_id = format!("{}", keccak256(pair_str.as_bytes()));
307
308                                    if normalized_tvl < client.tvl {
309                                        info!("Filtering out component {} due to low TVL: {:.2} < {:.2}",
310                                              component_id, normalized_tvl, client.tvl);
311                                        continue;
312                                    }
313
314                                    let component_with_state = client.create_component_with_state(
315                                        component_id.clone(),
316                                        tokens,
317                                        mm_name,
318                                        mm_level,
319                                        normalized_tvl
320                                    );
321                                    new_components.insert(component_id, component_with_state);
322                                }
323                            }
324                        }
325
326                        // Find components that were removed
327                        let removed_components: HashMap<String, ProtocolComponent> = current_components
328                            .iter()
329                            .filter(|&(id, _)| !new_components.contains_key(id))
330                            .map(|(k, v)| (k.clone(), v.component.clone()))
331                            .collect();
332
333                        // Update current state
334                        current_components = new_components.clone();
335
336                        let snapshot = Snapshot {
337                            states: new_components,
338                            vm_storage: HashMap::new(),
339                        };
340                        let timestamp = SystemTime::now().duration_since(
341                            SystemTime::UNIX_EPOCH
342                        ).map_err(
343                            |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
344                        )?.as_secs();
345
346                        let msg = StateSyncMessage::<TimestampHeader> {
347                            header: TimestampHeader { timestamp },
348                            snapshots: snapshot,
349                            deltas: None,
350                            removed_components,
351                        };
352
353                        yield Ok(("hashflow".to_string(), msg));
354                    },
355                    Err(e) => {
356                        error!("Failed to fetch price levels from Hashflow API: {}", e);
357                        continue;
358                    }
359                }
360            }
361        })
362    }
363
364    async fn request_binding_quote(
365        &self,
366        params: &GetAmountOutParams,
367    ) -> Result<SignedQuote, RFQError> {
368        let hashflow_chain = HashflowChain::from(self.chain);
369        let quote_request = HashflowQuoteRequest {
370            source: self.auth_user.clone(),
371            base_chain: hashflow_chain.clone(),
372            quote_chain: hashflow_chain,
373            rfqs: vec![HashflowRFQ {
374                base_token: params.token_in.to_string(),
375                quote_token: params.token_out.to_string(),
376                base_token_amount: Some(params.amount_in.to_string()),
377                quote_token_amount: None,
378                trader: params.receiver.to_string(),
379                effective_trader: None,
380            }],
381            calldata: false,
382        };
383
384        let url = self.quote_endpoint.clone();
385
386        let start_time = std::time::Instant::now();
387        const MAX_RETRIES: u32 = 3;
388        let mut last_error = None;
389
390        for attempt in 0..MAX_RETRIES {
391            // Check if we have time remaining for this attempt
392            let elapsed = start_time.elapsed();
393            if elapsed >= self.quote_timeout {
394                return Err(last_error.unwrap_or_else(|| {
395                    RFQError::ConnectionError(format!(
396                        "Hashflow quote request timed out after {} seconds",
397                        self.quote_timeout.as_secs()
398                    ))
399                }));
400            }
401
402            let remaining_time = self.quote_timeout - elapsed;
403
404            let http_client = Client::new();
405            let request = http_client
406                .post(&url)
407                .json(&quote_request)
408                .header("accept", "application/json")
409                .header("Authorization", &self.auth_key);
410
411            let response = match timeout(remaining_time, request.send()).await {
412                Ok(Ok(resp)) => resp,
413                Ok(Err(e)) => {
414                    warn!(
415                        "Hashflow quote request failed (attempt {}/{}): {}",
416                        attempt + 1,
417                        MAX_RETRIES,
418                        e
419                    );
420                    last_error = Some(RFQError::ConnectionError(format!(
421                        "Failed to send Hashflow quote request: {e}"
422                    )));
423                    if attempt < MAX_RETRIES - 1 {
424                        tokio::time::sleep(Duration::from_millis(100)).await;
425                        continue;
426                    } else {
427                        return Err(last_error.unwrap());
428                    }
429                }
430                Err(_) => {
431                    return Err(RFQError::ConnectionError(format!(
432                        "Hashflow quote request timed out after {} seconds",
433                        self.quote_timeout.as_secs()
434                    )));
435                }
436            };
437
438            if response.status() != 200 {
439                let err_msg = match response.text().await {
440                    Ok(text) => text,
441                    Err(e) => {
442                        warn!(
443                            "Hashflow error response parsing failed (attempt {}/{}): {}",
444                            attempt + 1,
445                            MAX_RETRIES,
446                            e
447                        );
448                        last_error = Some(RFQError::ParsingError(format!(
449                            "Failed to read response text from Hashflow failed request: {e}"
450                        )));
451                        if attempt < MAX_RETRIES - 1 {
452                            tokio::time::sleep(Duration::from_millis(100)).await;
453                            continue;
454                        } else {
455                            return Err(last_error.unwrap());
456                        }
457                    }
458                };
459                last_error = Some(RFQError::FatalError(format!(
460                    "Failed to send Hashflow quote request: {err_msg}",
461                )));
462                if attempt < MAX_RETRIES - 1 {
463                    warn!(
464                        "Hashflow returned non-200 status (attempt {}/{}): {}",
465                        attempt + 1,
466                        MAX_RETRIES,
467                        err_msg
468                    );
469                    tokio::time::sleep(Duration::from_millis(100)).await;
470                    continue;
471                } else {
472                    return Err(last_error.unwrap());
473                }
474            }
475
476            let quote_response = match response
477                .json::<HashflowQuoteResponse>()
478                .await
479            {
480                Ok(resp) => resp,
481                Err(e) => {
482                    warn!(
483                        "Hashflow quote response parsing failed (attempt {}/{}): {}",
484                        attempt + 1,
485                        MAX_RETRIES,
486                        e
487                    );
488                    last_error = Some(RFQError::ParsingError(format!(
489                        "Failed to parse Hashflow quote response: {e}"
490                    )));
491                    if attempt < MAX_RETRIES - 1 {
492                        tokio::time::sleep(Duration::from_millis(100)).await;
493                        continue;
494                    } else {
495                        return Err(last_error.unwrap());
496                    }
497                }
498            };
499
500            match quote_response.status.as_str() {
501                "success" => {
502                    if let Some(quotes) = quote_response.quotes {
503                        if quotes.is_empty() {
504                            return Err(RFQError::QuoteNotFound(format!(
505                                "Hashflow quote not found for {} {} ->{}",
506                                params.amount_in, params.token_in, params.token_out,
507                            )))
508                        }
509                        // We assume there will be only one quote request at a time
510                        let quote = quotes[0].clone();
511                        quote.validate(params)?;
512
513                        let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
514                        quote_attributes.insert("pool".to_string(), quote.quote_data.pool);
515                        if let Some(external_account) = quote.quote_data.external_account {
516                            quote_attributes
517                                .insert("external_account".to_string(), external_account);
518                        } else {
519                            quote_attributes.insert(
520                                "external_account".to_string(),
521                                Bytes::from_str(&Address::ZERO.to_string()).map_err(|_| {
522                                    RFQError::ParsingError(
523                                        "Failed to parse zero address".to_string(),
524                                    )
525                                })?,
526                            );
527                        }
528                        quote_attributes.insert("trader".to_string(), quote.quote_data.trader);
529                        quote_attributes
530                            .insert("base_token".to_string(), quote.quote_data.base_token);
531                        quote_attributes
532                            .insert("quote_token".to_string(), quote.quote_data.quote_token);
533                        quote_attributes.insert(
534                            "base_token_amount".to_string(),
535                            Bytes::from(
536                                biguint_to_u256(
537                                    &BigUint::from_str(&quote.quote_data.base_token_amount)
538                                        .map_err(|_| {
539                                            RFQError::ParsingError(format!(
540                                                "Failed to parse base token amount: {}",
541                                                quote.quote_data.base_token_amount
542                                            ))
543                                        })?,
544                                )
545                                .to_be_bytes::<32>()
546                                .to_vec(),
547                            ),
548                        );
549                        quote_attributes.insert(
550                            "quote_token_amount".to_string(),
551                            Bytes::from(
552                                biguint_to_u256(
553                                    &BigUint::from_str(&quote.quote_data.quote_token_amount)
554                                        .map_err(|_| {
555                                            RFQError::ParsingError(format!(
556                                                "Failed to parse quote token amount: {}",
557                                                quote.quote_data.quote_token_amount
558                                            ))
559                                        })?,
560                                )
561                                .to_be_bytes::<32>()
562                                .to_vec(),
563                            ),
564                        );
565                        quote_attributes.insert(
566                            "quote_expiry".to_string(),
567                            Bytes::from(
568                                U256::from(quote.quote_data.quote_expiry)
569                                    .to_be_bytes::<32>()
570                                    .to_vec(),
571                            ),
572                        );
573                        quote_attributes.insert(
574                            "nonce".to_string(),
575                            Bytes::from(
576                                U256::from(quote.quote_data.nonce)
577                                    .to_be_bytes::<32>()
578                                    .to_vec(),
579                            ),
580                        );
581                        quote_attributes.insert("tx_id".to_string(), quote.quote_data.tx_id);
582                        quote_attributes.insert("signature".to_string(), quote.signature);
583
584                        let signed_quote = SignedQuote {
585                            base_token: params.token_in.clone(),
586                            quote_token: params.token_out.clone(),
587                            amount_in: BigUint::from_str(&quote.quote_data.base_token_amount)
588                                .map_err(|_| {
589                                    RFQError::ParsingError(format!(
590                                        "Failed to parse amount in string: {}",
591                                        quote.quote_data.base_token_amount
592                                    ))
593                                })?,
594                            amount_out: BigUint::from_str(&quote.quote_data.quote_token_amount)
595                                .map_err(|_| {
596                                    RFQError::ParsingError(format!(
597                                        "Failed to parse amount out string: {}",
598                                        quote.quote_data.quote_token_amount
599                                    ))
600                                })?,
601                            quote_attributes,
602                        };
603                        return Ok(signed_quote);
604                    } else {
605                        return Err(RFQError::QuoteNotFound(format!(
606                            "Hashflow quote not found for {} {} ->{}",
607                            params.amount_in, params.token_in, params.token_out,
608                        )));
609                    }
610                }
611                "fail" => {
612                    return Err(RFQError::FatalError(format!(
613                        "Hashflow API error: {:?}",
614                        quote_response.error
615                    )));
616                }
617                _ => {
618                    return Err(RFQError::FatalError(
619                        "Hashflow API error: Unknown status".to_string(),
620                    ));
621                }
622            }
623        }
624
625        Err(last_error.unwrap_or_else(|| {
626            RFQError::ConnectionError("Hashflow quote request failed after retries".to_string())
627        }))
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use std::{env, str::FromStr, time::Duration};
634
635    use dotenv::dotenv;
636    use futures::StreamExt;
637    use tokio::time::timeout;
638
639    use super::*;
640    use crate::rfq::{
641        constants::get_hashflow_auth,
642        protocols::hashflow::models::{HashflowPair, HashflowPriceLevel},
643    };
644
645    #[test]
646    fn test_normalize_tvl_same_quote_token() {
647        let client = create_test_client();
648        let levels = HashMap::new();
649
650        // USDC is in our quote tokens, so no normalization should happen
651        let result = client.normalize_tvl(
652            1000.0,
653            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
654            &levels,
655        );
656        assert!(result.is_ok());
657        assert_eq!(result.unwrap(), 1000.0);
658    }
659
660    #[test]
661    fn test_normalize_tvl_different_quote_token() {
662        let client = create_test_client();
663        let mut levels = HashMap::new();
664        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
665        let usdc = Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
666
667        // Create mock levels for ETH/USDC pair for normalization
668        let eth_usdc_level = HashflowMarketMakerLevels {
669            pair: HashflowPair { base_token: weth.clone(), quote_token: usdc },
670            levels: vec![
671                HashflowPriceLevel { quantity: 1.0, price: 3000.0 }, /* 1 ETH = 3000 USDC */
672            ],
673        };
674
675        levels.insert("test_mm".to_string(), vec![eth_usdc_level]);
676
677        // Test normalizing ETH TVL to USDC
678        let result = client.normalize_tvl(2.0, weth, &levels);
679        assert!(result.is_ok());
680        // 2 ETH * 3000 USDC/ETH = 6000 USDC
681        assert_eq!(result.unwrap(), 6000.0);
682    }
683
684    #[test]
685    fn test_normalize_tvl_no_conversion_available() {
686        let client = create_test_client();
687        let levels = HashMap::new();
688        let result = client.normalize_tvl(
689            1000.0,
690            Bytes::from_str("0x1234567890123456789012345678901234567890").unwrap(),
691            &levels,
692        );
693        assert!(result.is_ok());
694        assert_eq!(result.unwrap(), 0.0);
695    }
696
697    fn create_test_client() -> HashflowClient {
698        let quote_tokens = HashSet::from([
699            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), // USDC
700            Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(), // USDT
701        ]);
702
703        HashflowClient::new(
704            Chain::Ethereum,
705            HashSet::new(),
706            1.0,
707            quote_tokens,
708            "test_user".to_string(),
709            "test_key".to_string(),
710            Duration::from_secs(5),
711            Duration::from_secs(5),
712        )
713        .unwrap()
714    }
715
716    #[tokio::test]
717    #[ignore] // Requires network access and HASHFLOW_KEY environment variable
718    async fn test_hashflow_api_polling() {
719        dotenv().expect("Missing .env file");
720        let auth = get_hashflow_auth().unwrap();
721
722        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
723        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
724
725        let tokens = HashSet::from([wbtc, weth.clone()]);
726
727        let quote_tokens = HashSet::from([
728            Bytes::from_str("0xa0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), // USDC
729            Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), // USDT
730        ]);
731
732        let client = HashflowClient::new(
733            Chain::Ethereum,
734            tokens,
735            1.0, // $1 minimum TVL - very low to capture most pairs
736            quote_tokens,
737            auth.user,
738            auth.key,
739            Duration::from_secs(1),
740            Duration::from_secs(5),
741        )
742        .unwrap();
743
744        let mut stream = client.stream();
745
746        let result = timeout(Duration::from_secs(10), async {
747            let mut message_count = 0;
748            let max_messages = 3;
749            let mut total_components_received = 0;
750
751            while let Some(result) = stream.next().await {
752                match result {
753                    Ok((component_id, msg)) => {
754                        println!("Received message with ID: {component_id}");
755
756                        assert!(!component_id.is_empty());
757                        assert_eq!(component_id, "hashflow");
758                        assert!(msg.header.timestamp > 0);
759
760                        let snapshot = &msg.snapshots;
761                        total_components_received += snapshot.states.len();
762
763                        println!("Received {} components in this message (Total so far: {})", 
764                                snapshot.states.len(), total_components_received);
765
766                        for (id, component_with_state) in &snapshot.states {
767                            let attributes = &component_with_state.state.attributes;
768                            let levels: &Bytes = attributes.get("levels").unwrap();
769                            // Check that levels exist
770                            if attributes.contains_key("levels") {
771                                println!("{levels:?}");
772                                assert!(!attributes["levels"].is_empty());
773                            }
774                            // Check that mm name exist
775                            if attributes.contains_key("mm") {
776                                assert!(!attributes["mm"].is_empty());
777                            }
778
779                            if let Some(tvl) = component_with_state.component_tvl {
780                                assert!(tvl >= 1.0);
781                                println!("Component {id} TVL: ${tvl:.2}");
782                            }
783                        }
784
785                        message_count += 1;
786                        if message_count >= max_messages {
787                            break;
788                        }
789                    }
790                    Err(e) => {
791                        panic!("Stream error: {e}");
792                    }
793                }
794            }
795
796            assert!(message_count > 0, "Should have received at least one message");
797            assert!(total_components_received >= 1, "Should have received at least 1 component with $1 TVL threshold");
798            println!("Successfully received {message_count} messages with {total_components_received} total components");
799        })
800        .await;
801
802        match result {
803            Ok(_) => println!("Test completed successfully"),
804            Err(_) => panic!("Test timed out - no messages received within 5 seconds"),
805        }
806    }
807
808    #[tokio::test]
809    #[ignore] // Requires network access and setting proper env vars
810    async fn test_request_binding_quote() {
811        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
812        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
813
814        let auth_user = String::from("propellerheads");
815        dotenv().expect("Missing .env file");
816        let auth_key = env::var("HASHFLOW_KEY").unwrap();
817
818        let client = HashflowClient::new(
819            Chain::Ethereum,
820            HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
821            10.0,
822            HashSet::new(),
823            auth_user,
824            auth_key,
825            Duration::from_secs(0),
826            Duration::from_secs(5),
827        )
828        .unwrap();
829
830        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
831
832        let params = GetAmountOutParams {
833            amount_in: BigUint::from(1_000000000000000000u64),
834            token_in: weth.clone(),
835            token_out: wbtc.clone(),
836            sender: router.clone(),
837            receiver: router.clone(),
838        };
839        let quote = client
840            .request_binding_quote(&params)
841            .await
842            .unwrap();
843
844        assert_eq!(quote.base_token, weth);
845        assert_eq!(quote.quote_token, wbtc);
846        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
847
848        // // Assuming the BTC - WETH price doesn't change too much at the time of running this
849        assert!(quote.amount_out > BigUint::from(3000000u64));
850
851        assert_eq!(quote.quote_attributes.len(), 11);
852        let expected_attributes = [
853            "pool",
854            "external_account",
855            "trader",
856            "base_token",
857            "quote_token",
858            "base_token_amount",
859            "quote_token_amount",
860            "quote_expiry",
861            "nonce",
862            "tx_id",
863            "signature",
864        ];
865        for attr in expected_attributes {
866            assert!(
867                quote
868                    .quote_attributes
869                    .contains_key(attr),
870                "Missing attribute: {attr}"
871            );
872        }
873        assert_eq!(
874            quote
875                .quote_attributes
876                .get("trader")
877                .unwrap(),
878            &router
879        );
880    }
881
882    /// Helper function to create a mock server that responds after a delay
883    async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
884        use tokio::{io::AsyncWriteExt, net::TcpListener};
885
886        let listener = TcpListener::bind("127.0.0.1:0")
887            .await
888            .unwrap();
889        let addr = listener.local_addr().unwrap();
890
891        let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
892
893        tokio::spawn(async move {
894            while let Ok((mut stream, _)) = listener.accept().await {
895                let json_response_clone = json_response.to_owned();
896                tokio::spawn(async move {
897                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
898                    let response = format!(
899                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
900                        json_response_clone.len(),
901                        json_response_clone
902                    );
903                    let _ = stream
904                        .write_all(response.as_bytes())
905                        .await;
906                    let _ = stream.flush().await;
907                    let _ = stream.shutdown().await;
908                });
909            }
910        });
911
912        tokio::time::sleep(Duration::from_millis(50)).await;
913        addr
914    }
915
916    fn create_test_hashflow_client(
917        quote_endpoint: String,
918        quote_timeout: Duration,
919    ) -> HashflowClient {
920        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
921        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
922
923        HashflowClient {
924            chain: Chain::Ethereum,
925            price_levels_endpoint: "http://unused/price-levels".to_string(),
926            market_makers_endpoint: "http://unused/market-makers".to_string(),
927            quote_endpoint,
928            tokens: HashSet::from([token_in, token_out]),
929            tvl: 10.0,
930            auth_key: "test_key".to_string(),
931            auth_user: "test_user".to_string(),
932            quote_tokens: HashSet::new(),
933            poll_time: Duration::from_secs(0),
934            quote_timeout,
935        }
936    }
937
938    /// Helper function to create test quote params
939    fn create_test_quote_params() -> GetAmountOutParams {
940        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
941        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
942        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
943
944        GetAmountOutParams {
945            amount_in: BigUint::from(1_000000000000000000u64),
946            token_in,
947            token_out,
948            sender: router.clone(),
949            receiver: router,
950        }
951    }
952
953    #[tokio::test]
954    async fn test_hashflow_quote_timeout() {
955        let addr = create_delayed_response_server(500).await;
956
957        // Test 1: Client with short timeout (200ms) - should timeout
958        let client_short_timeout = create_test_hashflow_client(
959            format!("http://127.0.0.1:{}/rfq", addr.port()),
960            Duration::from_millis(200),
961        );
962        let params = create_test_quote_params();
963
964        // This should timeout after 200ms
965        let start = std::time::Instant::now();
966        let result = client_short_timeout
967            .request_binding_quote(&params)
968            .await;
969        let elapsed = start.elapsed();
970
971        // Verify that we got a timeout error
972        assert!(result.is_err());
973        let err = result.unwrap_err();
974        match err {
975            RFQError::ConnectionError(msg) => {
976                assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
977            }
978            _ => panic!("Expected ConnectionError, got: {:?}", err),
979        }
980        // Should have timed out around 200ms, definitely less than 400ms
981        assert!(
982            elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
983            "Expected timeout around 200ms, got: {:?}",
984            elapsed
985        );
986
987        // Test 2: Client with long timeout (1 second) - should wait and receive response
988        // Note: With retry logic, we may need multiple attempts if the response is malformed,
989        // so we need a longer timeout to account for retries
990        let client_long_timeout = create_test_hashflow_client(
991            format!("http://127.0.0.1:{}/rfq", addr.port()),
992            Duration::from_secs(1),
993        );
994
995        // This should wait for the response (500ms)
996        let result = client_long_timeout
997            .request_binding_quote(&params)
998            .await;
999
1000        // Should succeed - the server waits 500ms which is within the 1s timeout
1001        assert!(result.is_ok(), "Expected success, got: {:?}", result);
1002    }
1003
1004    /// Helper function to create a mock server that fails twice, then succeeds
1005    async fn create_retry_server() -> (std::net::SocketAddr, std::sync::Arc<std::sync::Mutex<u32>>)
1006    {
1007        use std::sync::{Arc, Mutex};
1008
1009        use tokio::{io::AsyncWriteExt, net::TcpListener};
1010
1011        let request_count = Arc::new(Mutex::new(0u32));
1012        let request_count_clone = request_count.clone();
1013
1014        let listener = TcpListener::bind("127.0.0.1:0")
1015            .await
1016            .unwrap();
1017        let addr = listener.local_addr().unwrap();
1018
1019        let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
1020
1021        tokio::spawn(async move {
1022            while let Ok((mut stream, _)) = listener.accept().await {
1023                let count_clone = request_count_clone.clone();
1024                let json_response_clone = json_response.to_owned();
1025                tokio::spawn(async move {
1026                    *count_clone.lock().unwrap() += 1;
1027                    let count = *count_clone.lock().unwrap();
1028                    println!("Mock server: Received request #{count}");
1029
1030                    if count <= 2 {
1031                        let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1032                        let _ = stream
1033                            .write_all(response.as_bytes())
1034                            .await;
1035                    } else {
1036                        let response = format!(
1037                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1038                            json_response_clone.len(),
1039                            json_response_clone
1040                        );
1041                        let _ = stream
1042                            .write_all(response.as_bytes())
1043                            .await;
1044                    }
1045                    let _ = stream.flush().await;
1046                    let _ = stream.shutdown().await;
1047                });
1048            }
1049        });
1050
1051        tokio::time::sleep(Duration::from_millis(50)).await;
1052        (addr, request_count)
1053    }
1054
1055    #[tokio::test]
1056    async fn test_hashflow_quote_retry_on_bad_response() {
1057        let (addr, request_count) = create_retry_server().await;
1058
1059        let client = create_test_hashflow_client(
1060            format!("http://127.0.0.1:{}/rfq", addr.port()),
1061            Duration::from_secs(5),
1062        );
1063        let params = create_test_quote_params();
1064        let result = client
1065            .request_binding_quote(&params)
1066            .await;
1067
1068        assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1069        let quote = result.unwrap();
1070
1071        // Verify the quote is parsed as expected
1072        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
1073        assert_eq!(quote.amount_out, BigUint::from(3329502u64));
1074
1075        // Verify exactly 3 requests were made (2 failures + 1 success)
1076        let final_count = *request_count.lock().unwrap();
1077        assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1078    }
1079}