Skip to main content

tycho_simulation/rfq/protocols/hashflow/
client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address, U256};
8use async_trait::async_trait;
9use futures::stream::BoxStream;
10use num_bigint::BigUint;
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use tokio::time::{interval, timeout, Duration};
14use tracing::{error, info, warn};
15use tycho_common::{
16    models::{protocol::GetAmountOutParams, Chain},
17    simulation::indicatively_priced::SignedQuote,
18    Bytes,
19};
20
21use crate::{
22    evm::protocol::u256_num::biguint_to_u256,
23    rfq::{
24        client::RFQClient,
25        errors::RFQError,
26        models::TimestampHeader,
27        protocols::hashflow::models::{
28            HashflowChain, HashflowMarketMakerLevels, HashflowMarketMakersResponse,
29            HashflowPriceLevelsResponse, HashflowQuoteRequest, HashflowQuoteResponse, HashflowRFQ,
30        },
31    },
32    tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
33    tycho_common::models::protocol::{ProtocolComponent, ProtocolComponentState},
34};
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub struct HashflowClient {
38    chain: Chain,
39    price_levels_endpoint: String,
40    market_makers_endpoint: String,
41    quote_endpoint: String,
42    // Tokens that we want prices for
43    tokens: HashSet<Bytes>,
44    // Min tvl value in the quote token.
45    tvl: f64,
46    #[serde(skip_serializing, default)]
47    auth_key: String,
48    #[serde(skip_serializing, default)]
49    auth_user: String,
50    // Quote tokens to normalize to for TVL purposes. Should have the same prices.
51    quote_tokens: HashSet<Bytes>,
52    poll_time: Duration,
53    quote_timeout: Duration,
54}
55
56impl HashflowClient {
57    pub const PROTOCOL_SYSTEM: &'static str = "rfq:hashflow";
58
59    #[allow(clippy::too_many_arguments)]
60    pub fn new(
61        chain: Chain,
62        tokens: HashSet<Bytes>,
63        tvl: f64,
64        quote_tokens: HashSet<Bytes>,
65        auth_user: String,
66        auth_key: String,
67        poll_time: Duration,
68        quote_timeout: Duration,
69    ) -> Result<Self, RFQError> {
70        Ok(Self {
71            chain,
72            price_levels_endpoint: "https://api.hashflow.com/taker/v3/price-levels".to_string(),
73            market_makers_endpoint: "https://api.hashflow.com/taker/v3/market-makers".to_string(),
74            quote_endpoint: "https://api.hashflow.com/taker/v3/rfq".to_string(),
75            tokens,
76            tvl,
77            auth_key,
78            auth_user,
79            quote_tokens,
80            poll_time,
81            quote_timeout,
82        })
83    }
84
85    /// Normalize TVL to a common quote token for comparison
86    /// Returns the normalized TVL value, or 0.0 if normalization fails due to no liquidity
87    fn normalize_tvl(
88        &self,
89        raw_tvl: f64,
90        quote_token: Bytes,
91        levels_by_mm: &HashMap<String, Vec<HashflowMarketMakerLevels>>,
92    ) -> Result<f64, RFQError> {
93        // If the quote token is already in our approved quote token set, no conversion needed
94        if self.quote_tokens.contains(&quote_token) {
95            return Ok(raw_tvl);
96        }
97
98        // Try to find the price of the quote token in one of the approved quote tokens
99        // for normalization.
100        for approved_quote_token in &self.quote_tokens {
101            for mm_levels_inner in levels_by_mm.values() {
102                for quote_mm_level in mm_levels_inner {
103                    // Check for direct pair: quote_token/approved_quote_token
104                    if quote_mm_level.pair.base_token == quote_token &&
105                        quote_mm_level.pair.quote_token == *approved_quote_token
106                    {
107                        if let Some(price) = quote_mm_level.get_price(1.0) {
108                            return Ok(raw_tvl * price);
109                        }
110                    }
111                }
112            }
113        }
114
115        // If we can't normalize, return TVL 0 (pool will be filtered out)
116        Ok(0.0)
117    }
118
119    fn create_component_with_state(
120        &self,
121        component_id: String,
122        tokens: Vec<Bytes>,
123        mm_name: &str,
124        mm_level: &HashflowMarketMakerLevels,
125        tvl: f64,
126    ) -> ComponentWithState {
127        let protocol_component = ProtocolComponent {
128            id: component_id.clone(),
129            protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
130            protocol_type_name: "hashflow_pool".to_string(),
131            chain: self.chain,
132            tokens,
133            contract_addresses: vec![], // empty for RFQ
134            ..Default::default()
135        };
136
137        let mut attributes = HashMap::new();
138
139        // Store price levels as JSON string
140        if !mm_level.levels.is_empty() {
141            let levels_json = serde_json::to_string(&mm_level.levels).unwrap_or_default();
142            attributes.insert("levels".to_string(), levels_json.as_bytes().to_vec().into());
143        }
144        attributes.insert("mm".to_string(), mm_name.as_bytes().to_vec().into());
145
146        ComponentWithState {
147            state: ProtocolComponentState::new(&component_id, attributes, HashMap::new()),
148            component: protocol_component,
149            component_tvl: Some(tvl),
150            entrypoints: vec![],
151        }
152    }
153
154    async fn fetch_market_makers(&mut self) -> Result<Vec<String>, RFQError> {
155        let query_params = vec![
156            ("source", self.auth_user.clone()),
157            ("baseChainType", "evm".to_string()),
158            ("baseChainId", self.chain.id().to_string()),
159        ];
160
161        let http_client = Client::new();
162        let request = http_client
163            .get(&self.market_makers_endpoint)
164            .query(&query_params)
165            .header("accept", "application/json")
166            .header("Authorization", &self.auth_key);
167
168        let response = request.send().await.map_err(|e| {
169            RFQError::ConnectionError(format!("Failed to fetch market makers: {e}"))
170        })?;
171
172        if !response.status().is_success() {
173            return Err(RFQError::ConnectionError(format!(
174                "HTTP error {}: {}",
175                response.status(),
176                response
177                    .text()
178                    .await
179                    .unwrap_or_default()
180            )));
181        }
182
183        let mm_response: HashflowMarketMakersResponse = response.json().await.map_err(|e| {
184            RFQError::ParsingError(format!("Failed to parse market makers response: {e}"))
185        })?;
186
187        info!(
188            "Fetched {} market makers: {:?}",
189            mm_response.market_makers.len(),
190            mm_response.market_makers
191        );
192
193        Ok(mm_response.market_makers)
194    }
195
196    async fn fetch_price_levels(
197        &self,
198        market_makers: &Vec<String>,
199    ) -> Result<HashMap<String, Vec<HashflowMarketMakerLevels>>, RFQError> {
200        let mut query_params = vec![
201            ("source", self.auth_user.clone()),
202            ("baseChainType", "evm".to_string()),
203            ("baseChainId", self.chain.id().to_string()),
204        ];
205
206        // Add market makers as array parameters
207        for mm in market_makers {
208            query_params.push(("marketMakers[]", mm.clone()));
209        }
210
211        let http_client = Client::new();
212        let request = http_client
213            .get(&self.price_levels_endpoint)
214            .query(&query_params)
215            .header("accept", "application/json")
216            .header("Authorization", &self.auth_key);
217
218        let response = request
219            .send()
220            .await
221            .map_err(|e| RFQError::ConnectionError(format!("Failed to fetch price levels: {e}")))?;
222
223        if !response.status().is_success() {
224            return Err(RFQError::ConnectionError(format!(
225                "HTTP error {}: {}",
226                response.status(),
227                response
228                    .text()
229                    .await
230                    .unwrap_or_default()
231            )));
232        }
233
234        let price_response: HashflowPriceLevelsResponse = response.json().await.map_err(|e| {
235            RFQError::ParsingError(format!("Failed to parse price levels response: {e}"))
236        })?;
237
238        if price_response.status != "success" {
239            return Err(RFQError::InvalidInput(format!(
240                "API returned error status: {}",
241                price_response.error.unwrap_or_default()
242            )));
243        }
244
245        price_response
246            .levels
247            .ok_or_else(|| RFQError::ParsingError("API response missing levels".to_string()))
248    }
249}
250
251#[async_trait]
252impl RFQClient for HashflowClient {
253    fn stream(
254        &self,
255    ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
256        let mut client = self.clone();
257
258        Box::pin(async_stream::stream! {
259            let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
260            let mut ticker = interval(client.poll_time);
261
262            info!("Starting Hashflow price levels polling every {} seconds", client.poll_time.as_secs());
263            info!("TVL threshold: {:.2}", client.tvl);
264
265            loop {
266                ticker.tick().await;
267
268                let market_makers;
269                match client.fetch_market_makers().await {
270                    Ok(mms) => {
271                        market_makers = mms;
272                        info!("Successfully fetched market makers");
273                    }
274                    Err(e) => {
275                        info!("Failed to fetch market makers: {}", e);
276                        continue;
277                    }
278                }
279
280                match client.fetch_price_levels(&market_makers).await {
281                    Ok(levels_by_mm) => {
282                        let mut new_components = HashMap::new();
283
284                        info!("Fetched price levels from {} market makers", levels_by_mm.len());
285                        // Process all market maker levels
286                        for (mm_name, mm_levels) in levels_by_mm.iter() {
287                            for mm_level in mm_levels {
288                                let base_token = &mm_level.pair.base_token;
289                                let quote_token = &mm_level.pair.quote_token;
290
291                                // Check if both tokens are in our tokens set
292                                if client.tokens.contains(base_token) && client.tokens.contains(quote_token) {
293                                    let tokens = vec![base_token.clone(), quote_token.clone()];
294                                    let tvl = mm_level.calculate_tvl();
295
296                                    // Apply TVL normalization if needed
297                                    let normalized_tvl = client.normalize_tvl(
298                                        tvl,
299                                        mm_level.pair.quote_token.clone(),
300                                        &levels_by_mm,
301                                    )?;
302
303                                    // Hash the pair for component id
304                                    let pair_str = format!("hashflow_{}/{}", hex::encode(base_token), hex::encode(quote_token));
305                                    let component_id = format!("{}", keccak256(pair_str.as_bytes()));
306
307                                    if normalized_tvl < client.tvl {
308                                        info!("Filtering out component {} due to low TVL: {:.2} < {:.2}",
309                                              component_id, normalized_tvl, client.tvl);
310                                        continue;
311                                    }
312
313                                    let component_with_state = client.create_component_with_state(
314                                        component_id.clone(),
315                                        tokens,
316                                        mm_name,
317                                        mm_level,
318                                        normalized_tvl
319                                    );
320                                    new_components.insert(component_id, component_with_state);
321                                }
322                            }
323                        }
324
325                        // Find components that were removed
326                        let removed_components: HashMap<String, ProtocolComponent> = current_components
327                            .iter()
328                            .filter(|&(id, _)| !new_components.contains_key(id))
329                            .map(|(k, v)| (k.clone(), v.component.clone()))
330                            .collect();
331
332                        // Update current state
333                        current_components = new_components.clone();
334
335                        let snapshot = Snapshot {
336                            states: new_components,
337                            vm_storage: HashMap::new(),
338                        };
339                        let timestamp = SystemTime::now().duration_since(
340                            SystemTime::UNIX_EPOCH
341                        ).map_err(
342                            |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
343                        )?.as_secs();
344
345                        let msg = StateSyncMessage::<TimestampHeader> {
346                            header: TimestampHeader { timestamp },
347                            snapshots: snapshot,
348                            deltas: None,
349                            removed_components,
350                        };
351
352                        yield Ok(("hashflow".to_string(), msg));
353                    },
354                    Err(e) => {
355                        error!("Failed to fetch price levels from Hashflow API: {}", e);
356                        continue;
357                    }
358                }
359            }
360        })
361    }
362
363    async fn request_binding_quote(
364        &self,
365        params: &GetAmountOutParams,
366    ) -> Result<SignedQuote, RFQError> {
367        let hashflow_chain = HashflowChain::from(self.chain);
368        let quote_request = HashflowQuoteRequest {
369            source: self.auth_user.clone(),
370            base_chain: hashflow_chain.clone(),
371            quote_chain: hashflow_chain,
372            rfqs: vec![HashflowRFQ {
373                base_token: params.token_in.to_string(),
374                quote_token: params.token_out.to_string(),
375                base_token_amount: Some(params.amount_in.to_string()),
376                quote_token_amount: None,
377                trader: params.receiver.to_string(),
378                effective_trader: None,
379            }],
380            calldata: false,
381        };
382
383        let url = self.quote_endpoint.clone();
384
385        let start_time = std::time::Instant::now();
386        const MAX_RETRIES: u32 = 3;
387        let mut last_error = None;
388
389        for attempt in 0..MAX_RETRIES {
390            // Check if we have time remaining for this attempt
391            let elapsed = start_time.elapsed();
392            if elapsed >= self.quote_timeout {
393                return Err(last_error.unwrap_or_else(|| {
394                    RFQError::ConnectionError(format!(
395                        "Hashflow quote request timed out after {} seconds",
396                        self.quote_timeout.as_secs()
397                    ))
398                }));
399            }
400
401            let remaining_time = self.quote_timeout - elapsed;
402
403            let http_client = Client::new();
404            let request = http_client
405                .post(&url)
406                .json(&quote_request)
407                .header("accept", "application/json")
408                .header("Authorization", &self.auth_key);
409
410            let response = match timeout(remaining_time, request.send()).await {
411                Ok(Ok(resp)) => resp,
412                Ok(Err(e)) => {
413                    warn!(
414                        "Hashflow quote request failed (attempt {}/{}): {}",
415                        attempt + 1,
416                        MAX_RETRIES,
417                        e
418                    );
419                    last_error = Some(RFQError::ConnectionError(format!(
420                        "Failed to send Hashflow quote request: {e}"
421                    )));
422                    if attempt < MAX_RETRIES - 1 {
423                        tokio::time::sleep(Duration::from_millis(100)).await;
424                        continue;
425                    } else {
426                        return Err(last_error.unwrap());
427                    }
428                }
429                Err(_) => {
430                    return Err(RFQError::ConnectionError(format!(
431                        "Hashflow quote request timed out after {} seconds",
432                        self.quote_timeout.as_secs()
433                    )));
434                }
435            };
436
437            if response.status() != 200 {
438                let err_msg = match response.text().await {
439                    Ok(text) => text,
440                    Err(e) => {
441                        warn!(
442                            "Hashflow error response parsing failed (attempt {}/{}): {}",
443                            attempt + 1,
444                            MAX_RETRIES,
445                            e
446                        );
447                        last_error = Some(RFQError::ParsingError(format!(
448                            "Failed to read response text from Hashflow failed request: {e}"
449                        )));
450                        if attempt < MAX_RETRIES - 1 {
451                            tokio::time::sleep(Duration::from_millis(100)).await;
452                            continue;
453                        } else {
454                            return Err(last_error.unwrap());
455                        }
456                    }
457                };
458                last_error = Some(RFQError::FatalError(format!(
459                    "Failed to send Hashflow quote request: {err_msg}",
460                )));
461                if attempt < MAX_RETRIES - 1 {
462                    warn!(
463                        "Hashflow returned non-200 status (attempt {}/{}): {}",
464                        attempt + 1,
465                        MAX_RETRIES,
466                        err_msg
467                    );
468                    tokio::time::sleep(Duration::from_millis(100)).await;
469                    continue;
470                } else {
471                    return Err(last_error.unwrap());
472                }
473            }
474
475            let quote_response = match response
476                .json::<HashflowQuoteResponse>()
477                .await
478            {
479                Ok(resp) => resp,
480                Err(e) => {
481                    warn!(
482                        "Hashflow quote response parsing failed (attempt {}/{}): {}",
483                        attempt + 1,
484                        MAX_RETRIES,
485                        e
486                    );
487                    last_error = Some(RFQError::ParsingError(format!(
488                        "Failed to parse Hashflow quote response: {e}"
489                    )));
490                    if attempt < MAX_RETRIES - 1 {
491                        tokio::time::sleep(Duration::from_millis(100)).await;
492                        continue;
493                    } else {
494                        return Err(last_error.unwrap());
495                    }
496                }
497            };
498
499            match quote_response.status.as_str() {
500                "success" => {
501                    if let Some(quotes) = quote_response.quotes {
502                        if quotes.is_empty() {
503                            return Err(RFQError::QuoteNotFound(format!(
504                                "Hashflow quote not found for {} {} ->{}",
505                                params.amount_in, params.token_in, params.token_out,
506                            )));
507                        }
508                        // We assume there will be only one quote request at a time
509                        let quote = quotes[0].clone();
510                        quote.validate(params)?;
511
512                        let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
513                        quote_attributes.insert("pool".to_string(), quote.quote_data.pool);
514                        if let Some(external_account) = quote.quote_data.external_account {
515                            quote_attributes
516                                .insert("external_account".to_string(), external_account);
517                        } else {
518                            quote_attributes.insert(
519                                "external_account".to_string(),
520                                Bytes::from_str(&Address::ZERO.to_string()).map_err(|_| {
521                                    RFQError::ParsingError(
522                                        "Failed to parse zero address".to_string(),
523                                    )
524                                })?,
525                            );
526                        }
527                        quote_attributes.insert("trader".to_string(), quote.quote_data.trader);
528                        quote_attributes
529                            .insert("base_token".to_string(), quote.quote_data.base_token);
530                        quote_attributes
531                            .insert("quote_token".to_string(), quote.quote_data.quote_token);
532                        quote_attributes.insert(
533                            "base_token_amount".to_string(),
534                            Bytes::from(
535                                biguint_to_u256(
536                                    &BigUint::from_str(&quote.quote_data.base_token_amount)
537                                        .map_err(|_| {
538                                            RFQError::ParsingError(format!(
539                                                "Failed to parse base token amount: {}",
540                                                quote.quote_data.base_token_amount
541                                            ))
542                                        })?,
543                                )
544                                .to_be_bytes::<32>()
545                                .to_vec(),
546                            ),
547                        );
548                        quote_attributes.insert(
549                            "quote_token_amount".to_string(),
550                            Bytes::from(
551                                biguint_to_u256(
552                                    &BigUint::from_str(&quote.quote_data.quote_token_amount)
553                                        .map_err(|_| {
554                                            RFQError::ParsingError(format!(
555                                                "Failed to parse quote token amount: {}",
556                                                quote.quote_data.quote_token_amount
557                                            ))
558                                        })?,
559                                )
560                                .to_be_bytes::<32>()
561                                .to_vec(),
562                            ),
563                        );
564                        quote_attributes.insert(
565                            "quote_expiry".to_string(),
566                            Bytes::from(
567                                U256::from(quote.quote_data.quote_expiry)
568                                    .to_be_bytes::<32>()
569                                    .to_vec(),
570                            ),
571                        );
572                        quote_attributes.insert(
573                            "nonce".to_string(),
574                            Bytes::from(
575                                U256::from(quote.quote_data.nonce)
576                                    .to_be_bytes::<32>()
577                                    .to_vec(),
578                            ),
579                        );
580                        quote_attributes.insert("tx_id".to_string(), quote.quote_data.tx_id);
581                        quote_attributes.insert("signature".to_string(), quote.signature);
582
583                        let signed_quote = SignedQuote {
584                            base_token: params.token_in.clone(),
585                            quote_token: params.token_out.clone(),
586                            amount_in: BigUint::from_str(&quote.quote_data.base_token_amount)
587                                .map_err(|_| {
588                                    RFQError::ParsingError(format!(
589                                        "Failed to parse amount in string: {}",
590                                        quote.quote_data.base_token_amount
591                                    ))
592                                })?,
593                            amount_out: BigUint::from_str(&quote.quote_data.quote_token_amount)
594                                .map_err(|_| {
595                                    RFQError::ParsingError(format!(
596                                        "Failed to parse amount out string: {}",
597                                        quote.quote_data.quote_token_amount
598                                    ))
599                                })?,
600                            quote_attributes,
601                        };
602                        return Ok(signed_quote);
603                    } else {
604                        return Err(RFQError::QuoteNotFound(format!(
605                            "Hashflow quote not found for {} {} ->{}",
606                            params.amount_in, params.token_in, params.token_out,
607                        )));
608                    }
609                }
610                "fail" => {
611                    return Err(RFQError::FatalError(format!(
612                        "Hashflow API error: {:?}",
613                        quote_response.error
614                    )));
615                }
616                _ => {
617                    return Err(RFQError::FatalError(
618                        "Hashflow API error: Unknown status".to_string(),
619                    ));
620                }
621            }
622        }
623
624        Err(last_error.unwrap_or_else(|| {
625            RFQError::ConnectionError("Hashflow quote request failed after retries".to_string())
626        }))
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use std::{env, str::FromStr, time::Duration};
633
634    use dotenv::dotenv;
635    use futures::StreamExt;
636    use tokio::time::timeout;
637
638    use super::*;
639    use crate::rfq::{
640        constants::get_hashflow_auth,
641        protocols::hashflow::models::{HashflowPair, HashflowPriceLevel},
642    };
643
644    #[test]
645    fn test_normalize_tvl_same_quote_token() {
646        let client = create_test_client();
647        let levels = HashMap::new();
648
649        // USDC is in our quote tokens, so no normalization should happen
650        let result = client.normalize_tvl(
651            1000.0,
652            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
653            &levels,
654        );
655        assert!(result.is_ok());
656        assert_eq!(result.unwrap(), 1000.0);
657    }
658
659    #[test]
660    fn test_normalize_tvl_different_quote_token() {
661        let client = create_test_client();
662        let mut levels = HashMap::new();
663        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
664        let usdc = Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
665
666        // Create mock levels for ETH/USDC pair for normalization
667        let eth_usdc_level = HashflowMarketMakerLevels {
668            pair: HashflowPair { base_token: weth.clone(), quote_token: usdc },
669            levels: vec![
670                HashflowPriceLevel { quantity: 1.0, price: 3000.0 }, /* 1 ETH = 3000 USDC */
671            ],
672        };
673
674        levels.insert("test_mm".to_string(), vec![eth_usdc_level]);
675
676        // Test normalizing ETH TVL to USDC
677        let result = client.normalize_tvl(2.0, weth, &levels);
678        assert!(result.is_ok());
679        // 2 ETH * 3000 USDC/ETH = 6000 USDC
680        assert_eq!(result.unwrap(), 6000.0);
681    }
682
683    #[test]
684    fn test_normalize_tvl_no_conversion_available() {
685        let client = create_test_client();
686        let levels = HashMap::new();
687        let result = client.normalize_tvl(
688            1000.0,
689            Bytes::from_str("0x1234567890123456789012345678901234567890").unwrap(),
690            &levels,
691        );
692        assert!(result.is_ok());
693        assert_eq!(result.unwrap(), 0.0);
694    }
695
696    fn create_test_client() -> HashflowClient {
697        let quote_tokens = HashSet::from([
698            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), // USDC
699            Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(), // USDT
700        ]);
701
702        HashflowClient::new(
703            Chain::Ethereum,
704            HashSet::new(),
705            1.0,
706            quote_tokens,
707            "test_user".to_string(),
708            "test_key".to_string(),
709            Duration::from_secs(5),
710            Duration::from_secs(5),
711        )
712        .unwrap()
713    }
714
715    #[tokio::test]
716    #[ignore] // Requires network access and HASHFLOW_KEY environment variable
717    async fn test_hashflow_api_polling() {
718        dotenv().expect("Missing .env file");
719        let auth = get_hashflow_auth().unwrap();
720
721        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
722        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
723
724        let tokens = HashSet::from([wbtc, weth.clone()]);
725
726        let quote_tokens = HashSet::from([
727            Bytes::from_str("0xa0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), // USDC
728            Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), // USDT
729        ]);
730
731        let client = HashflowClient::new(
732            Chain::Ethereum,
733            tokens,
734            1.0, // $1 minimum TVL - very low to capture most pairs
735            quote_tokens,
736            auth.user,
737            auth.key,
738            Duration::from_secs(1),
739            Duration::from_secs(5),
740        )
741        .unwrap();
742
743        let mut stream = client.stream();
744
745        let result = timeout(Duration::from_secs(10), async {
746            let mut message_count = 0;
747            let max_messages = 3;
748            let mut total_components_received = 0;
749
750            while let Some(result) = stream.next().await {
751                match result {
752                    Ok((component_id, msg)) => {
753                        println!("Received message with ID: {component_id}");
754
755                        assert!(!component_id.is_empty());
756                        assert_eq!(component_id, "hashflow");
757                        assert!(msg.header.timestamp > 0);
758
759                        let snapshot = &msg.snapshots;
760                        total_components_received += snapshot.states.len();
761
762                        println!("Received {} components in this message (Total so far: {})",
763                                snapshot.states.len(), total_components_received);
764
765                        for (id, component_with_state) in &snapshot.states {
766                            let attributes = &component_with_state.state.attributes;
767                            let levels: &Bytes = attributes.get("levels").unwrap();
768                            // Check that levels exist
769                            if attributes.contains_key("levels") {
770                                println!("{levels:?}");
771                                assert!(!attributes["levels"].is_empty());
772                            }
773                            // Check that mm name exist
774                            if attributes.contains_key("mm") {
775                                assert!(!attributes["mm"].is_empty());
776                            }
777
778                            if let Some(tvl) = component_with_state.component_tvl {
779                                assert!(tvl >= 1.0);
780                                println!("Component {id} TVL: ${tvl:.2}");
781                            }
782                        }
783
784                        message_count += 1;
785                        if message_count >= max_messages {
786                            break;
787                        }
788                    }
789                    Err(e) => {
790                        panic!("Stream error: {e}");
791                    }
792                }
793            }
794
795            assert!(message_count > 0, "Should have received at least one message");
796            assert!(total_components_received >= 1, "Should have received at least 1 component with $1 TVL threshold");
797            println!("Successfully received {message_count} messages with {total_components_received} total components");
798        })
799        .await;
800
801        match result {
802            Ok(_) => println!("Test completed successfully"),
803            Err(_) => panic!("Test timed out - no messages received within 5 seconds"),
804        }
805    }
806
807    #[tokio::test]
808    #[ignore] // Requires network access and setting proper env vars
809    async fn test_request_binding_quote() {
810        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
811        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
812
813        let auth_user = String::from("propellerheads");
814        dotenv().expect("Missing .env file");
815        let auth_key = env::var("HASHFLOW_KEY").unwrap();
816
817        let client = HashflowClient::new(
818            Chain::Ethereum,
819            HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
820            10.0,
821            HashSet::new(),
822            auth_user,
823            auth_key,
824            Duration::from_secs(0),
825            Duration::from_secs(5),
826        )
827        .unwrap();
828
829        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
830
831        let params = GetAmountOutParams {
832            amount_in: BigUint::from(1_000000000000000000u64),
833            token_in: weth.clone(),
834            token_out: wbtc.clone(),
835            sender: router.clone(),
836            receiver: router.clone(),
837        };
838        let quote = client
839            .request_binding_quote(&params)
840            .await
841            .unwrap();
842
843        assert_eq!(quote.base_token, weth);
844        assert_eq!(quote.quote_token, wbtc);
845        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
846
847        // // Assuming the BTC - WETH price doesn't change too much at the time of running this
848        assert!(quote.amount_out > BigUint::from(3000000u64));
849
850        assert_eq!(quote.quote_attributes.len(), 11);
851        let expected_attributes = [
852            "pool",
853            "external_account",
854            "trader",
855            "base_token",
856            "quote_token",
857            "base_token_amount",
858            "quote_token_amount",
859            "quote_expiry",
860            "nonce",
861            "tx_id",
862            "signature",
863        ];
864        for attr in expected_attributes {
865            assert!(
866                quote
867                    .quote_attributes
868                    .contains_key(attr),
869                "Missing attribute: {attr}"
870            );
871        }
872        assert_eq!(
873            quote
874                .quote_attributes
875                .get("trader")
876                .unwrap(),
877            &router
878        );
879    }
880
881    /// Helper function to create a mock server that responds after a delay
882    async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
883        use tokio::{io::AsyncWriteExt, net::TcpListener};
884
885        let listener = TcpListener::bind("127.0.0.1:0")
886            .await
887            .unwrap();
888        let addr = listener.local_addr().unwrap();
889
890        let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
891
892        tokio::spawn(async move {
893            while let Ok((mut stream, _)) = listener.accept().await {
894                let json_response_clone = json_response.to_owned();
895                tokio::spawn(async move {
896                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
897                    let response = format!(
898                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
899                        json_response_clone.len(),
900                        json_response_clone
901                    );
902                    let _ = stream
903                        .write_all(response.as_bytes())
904                        .await;
905                    let _ = stream.flush().await;
906                    let _ = stream.shutdown().await;
907                });
908            }
909        });
910
911        tokio::time::sleep(Duration::from_millis(50)).await;
912        addr
913    }
914
915    fn create_test_hashflow_client(
916        quote_endpoint: String,
917        quote_timeout: Duration,
918    ) -> HashflowClient {
919        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
920        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
921
922        HashflowClient {
923            chain: Chain::Ethereum,
924            price_levels_endpoint: "http://unused/price-levels".to_string(),
925            market_makers_endpoint: "http://unused/market-makers".to_string(),
926            quote_endpoint,
927            tokens: HashSet::from([token_in, token_out]),
928            tvl: 10.0,
929            auth_key: "test_key".to_string(),
930            auth_user: "test_user".to_string(),
931            quote_tokens: HashSet::new(),
932            poll_time: Duration::from_secs(0),
933            quote_timeout,
934        }
935    }
936
937    /// Helper function to create test quote params
938    fn create_test_quote_params() -> GetAmountOutParams {
939        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
940        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
941        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
942
943        GetAmountOutParams {
944            amount_in: BigUint::from(1_000000000000000000u64),
945            token_in,
946            token_out,
947            sender: router.clone(),
948            receiver: router,
949        }
950    }
951
952    #[tokio::test]
953    async fn test_hashflow_quote_timeout() {
954        let addr = create_delayed_response_server(500).await;
955
956        // Test 1: Client with short timeout (200ms) - should timeout
957        let client_short_timeout = create_test_hashflow_client(
958            format!("http://127.0.0.1:{}/rfq", addr.port()),
959            Duration::from_millis(200),
960        );
961        let params = create_test_quote_params();
962
963        // This should timeout after 200ms
964        let start = std::time::Instant::now();
965        let result = client_short_timeout
966            .request_binding_quote(&params)
967            .await;
968        let elapsed = start.elapsed();
969
970        // Verify that we got a timeout error
971        assert!(result.is_err());
972        let err = result.unwrap_err();
973        match err {
974            RFQError::ConnectionError(msg) => {
975                assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
976            }
977            _ => panic!("Expected ConnectionError, got: {:?}", err),
978        }
979        // Should have timed out around 200ms, definitely less than 400ms
980        assert!(
981            elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
982            "Expected timeout around 200ms, got: {:?}",
983            elapsed
984        );
985
986        // Test 2: Client with long timeout (1 second) - should wait and receive response
987        // Note: With retry logic, we may need multiple attempts if the response is malformed,
988        // so we need a longer timeout to account for retries
989        let client_long_timeout = create_test_hashflow_client(
990            format!("http://127.0.0.1:{}/rfq", addr.port()),
991            Duration::from_secs(1),
992        );
993
994        // This should wait for the response (500ms)
995        let result = client_long_timeout
996            .request_binding_quote(&params)
997            .await;
998
999        // Should succeed - the server waits 500ms which is within the 1s timeout
1000        assert!(result.is_ok(), "Expected success, got: {:?}", result);
1001    }
1002
1003    /// Helper function to create a mock server that fails twice, then succeeds
1004    async fn create_retry_server() -> (std::net::SocketAddr, std::sync::Arc<std::sync::Mutex<u32>>)
1005    {
1006        use std::sync::{Arc, Mutex};
1007
1008        use tokio::{io::AsyncWriteExt, net::TcpListener};
1009
1010        let request_count = Arc::new(Mutex::new(0u32));
1011        let request_count_clone = request_count.clone();
1012
1013        let listener = TcpListener::bind("127.0.0.1:0")
1014            .await
1015            .unwrap();
1016        let addr = listener.local_addr().unwrap();
1017
1018        let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
1019
1020        tokio::spawn(async move {
1021            while let Ok((mut stream, _)) = listener.accept().await {
1022                let count_clone = request_count_clone.clone();
1023                let json_response_clone = json_response.to_owned();
1024                tokio::spawn(async move {
1025                    *count_clone.lock().unwrap() += 1;
1026                    let count = *count_clone.lock().unwrap();
1027                    println!("Mock server: Received request #{count}");
1028
1029                    if count <= 2 {
1030                        let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1031                        let _ = stream
1032                            .write_all(response.as_bytes())
1033                            .await;
1034                    } else {
1035                        let response = format!(
1036                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1037                            json_response_clone.len(),
1038                            json_response_clone
1039                        );
1040                        let _ = stream
1041                            .write_all(response.as_bytes())
1042                            .await;
1043                    }
1044                    let _ = stream.flush().await;
1045                    let _ = stream.shutdown().await;
1046                });
1047            }
1048        });
1049
1050        tokio::time::sleep(Duration::from_millis(50)).await;
1051        (addr, request_count)
1052    }
1053
1054    #[tokio::test]
1055    async fn test_hashflow_quote_retry_on_bad_response() {
1056        let (addr, request_count) = create_retry_server().await;
1057
1058        let client = create_test_hashflow_client(
1059            format!("http://127.0.0.1:{}/rfq", addr.port()),
1060            Duration::from_secs(5),
1061        );
1062        let params = create_test_quote_params();
1063        let result = client
1064            .request_binding_quote(&params)
1065            .await;
1066
1067        assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1068        let quote = result.unwrap();
1069
1070        // Verify the quote is parsed as expected
1071        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
1072        assert_eq!(quote.amount_out, BigUint::from(3329502u64));
1073
1074        // Verify exactly 3 requests were made (2 failures + 1 success)
1075        let final_count = *request_count.lock().unwrap();
1076        assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1077    }
1078
1079    #[test]
1080    fn test_hashflow_client_serialize_deserialize_roundtrip() {
1081        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1082        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1083        let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1084
1085        let original = HashflowClient {
1086            chain: Chain::Ethereum,
1087            price_levels_endpoint: "https://api.hashflow.com/price_levels".to_string(),
1088            market_makers_endpoint: "https://api.hashflow.com/market_makers".to_string(),
1089            quote_endpoint: "https://api.hashflow.com/quote".to_string(),
1090            tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1091            tvl: 50.5,
1092            auth_key: "secret_key".to_string(),
1093            auth_user: "secret_user".to_string(),
1094            quote_tokens: HashSet::from([quote_token.clone()]),
1095            poll_time: Duration::from_secs(10),
1096            quote_timeout: Duration::from_millis(5500),
1097        };
1098
1099        let serialized = serde_json::to_string(&original).unwrap();
1100        let deserialized: HashflowClient = serde_json::from_str(&serialized).unwrap();
1101
1102        // Fields that should round-trip correctly
1103        assert_eq!(deserialized.chain, original.chain);
1104        assert_eq!(deserialized.price_levels_endpoint, original.price_levels_endpoint);
1105        assert_eq!(deserialized.market_makers_endpoint, original.market_makers_endpoint);
1106        assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1107        assert_eq!(deserialized.tokens, original.tokens);
1108        assert_eq!(deserialized.tvl, original.tvl);
1109        assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1110        assert_eq!(deserialized.poll_time, original.poll_time);
1111        assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1112
1113        // auth_key and auth_user should NOT round-trip (skip_serializing + default)
1114        assert_eq!(deserialized.auth_key, "");
1115        assert_eq!(deserialized.auth_user, "");
1116        assert_ne!(deserialized.auth_key, original.auth_key);
1117        assert_ne!(deserialized.auth_user, original.auth_user);
1118    }
1119
1120    #[test]
1121    fn test_hashflow_client_deserialize_with_credentials() {
1122        // When auth_key and auth_user are provided in JSON, they should be deserialized
1123        // (skip_serializing only affects serialization, not deserialization)
1124        let json = r#"{
1125            "chain": "ethereum",
1126            "price_levels_endpoint": "https://api.hashflow.com/price_levels",
1127            "market_makers_endpoint": "https://api.hashflow.com/market_makers",
1128            "quote_endpoint": "https://api.hashflow.com/quote",
1129            "tokens": [],
1130            "tvl": 10.0,
1131            "auth_key": "provided_key",
1132            "auth_user": "provided_user",
1133            "quote_tokens": [],
1134            "poll_time": {"secs": 10, "nanos": 0},
1135            "quote_timeout": {"secs": 30, "nanos": 0}
1136        }"#;
1137
1138        let client: HashflowClient = serde_json::from_str(json).unwrap();
1139
1140        // Credentials should be deserialized from JSON
1141        assert_eq!(client.auth_key, "provided_key");
1142        assert_eq!(client.auth_user, "provided_user");
1143    }
1144}