1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address, U256};
8use async_trait::async_trait;
9use futures::stream::BoxStream;
10use num_bigint::BigUint;
11use reqwest::Client;
12use tokio::time::{interval, timeout, Duration};
13use tracing::{error, info, warn};
14use tycho_common::{
15 models::{protocol::GetAmountOutParams, Chain},
16 simulation::indicatively_priced::SignedQuote,
17 Bytes,
18};
19
20use crate::{
21 evm::protocol::u256_num::biguint_to_u256,
22 rfq::{
23 client::RFQClient,
24 errors::RFQError,
25 models::TimestampHeader,
26 protocols::hashflow::models::{
27 HashflowChain, HashflowMarketMakerLevels, HashflowMarketMakersResponse,
28 HashflowPriceLevelsResponse, HashflowQuoteRequest, HashflowQuoteResponse, HashflowRFQ,
29 },
30 },
31 tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
32 tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
33};
34
35#[derive(Clone, Debug)]
36pub struct HashflowClient {
37 chain: Chain,
38 price_levels_endpoint: String,
39 market_makers_endpoint: String,
40 quote_endpoint: String,
41 tokens: HashSet<Bytes>,
43 tvl: f64,
45 auth_key: String,
46 auth_user: String,
47 quote_tokens: HashSet<Bytes>,
49 poll_time: Duration,
50 quote_timeout: Duration,
51}
52
53impl HashflowClient {
54 pub const PROTOCOL_SYSTEM: &'static str = "rfq:hashflow";
55
56 #[allow(clippy::too_many_arguments)]
57 pub fn new(
58 chain: Chain,
59 tokens: HashSet<Bytes>,
60 tvl: f64,
61 quote_tokens: HashSet<Bytes>,
62 auth_user: String,
63 auth_key: String,
64 poll_time: Duration,
65 quote_timeout: Duration,
66 ) -> Result<Self, RFQError> {
67 Ok(Self {
68 chain,
69 price_levels_endpoint: "https://api.hashflow.com/taker/v3/price-levels".to_string(),
70 market_makers_endpoint: "https://api.hashflow.com/taker/v3/market-makers".to_string(),
71 quote_endpoint: "https://api.hashflow.com/taker/v3/rfq".to_string(),
72 tokens,
73 tvl,
74 auth_key,
75 auth_user,
76 quote_tokens,
77 poll_time,
78 quote_timeout,
79 })
80 }
81
82 fn normalize_tvl(
85 &self,
86 raw_tvl: f64,
87 quote_token: Bytes,
88 levels_by_mm: &HashMap<String, Vec<HashflowMarketMakerLevels>>,
89 ) -> Result<f64, RFQError> {
90 if self.quote_tokens.contains("e_token) {
92 return Ok(raw_tvl);
93 }
94
95 for approved_quote_token in &self.quote_tokens {
98 for (_mm, mm_levels_inner) in levels_by_mm.iter() {
99 for quote_mm_level in mm_levels_inner {
100 if quote_mm_level.pair.base_token == quote_token &&
102 quote_mm_level.pair.quote_token == *approved_quote_token
103 {
104 if let Some(price) = quote_mm_level.get_price(1.0) {
105 return Ok(raw_tvl * price);
106 }
107 }
108 }
109 }
110 }
111
112 Ok(0.0)
114 }
115
116 fn create_component_with_state(
117 &self,
118 component_id: String,
119 tokens: Vec<Bytes>,
120 mm_name: &str,
121 mm_level: &HashflowMarketMakerLevels,
122 tvl: f64,
123 ) -> ComponentWithState {
124 let protocol_component = ProtocolComponent {
125 id: component_id.clone(),
126 protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
127 protocol_type_name: "hashflow_pool".to_string(),
128 chain: self.chain.into(),
129 tokens,
130 contract_ids: vec![], ..Default::default()
132 };
133
134 let mut attributes = HashMap::new();
135
136 if !mm_level.levels.is_empty() {
138 let levels_json = serde_json::to_string(&mm_level.levels).unwrap_or_default();
139 attributes.insert("levels".to_string(), levels_json.as_bytes().to_vec().into());
140 }
141 attributes.insert("mm".to_string(), mm_name.as_bytes().to_vec().into());
142
143 ComponentWithState {
144 state: ResponseProtocolState {
145 component_id: component_id.clone(),
146 attributes,
147 balances: HashMap::new(),
148 },
149 component: protocol_component,
150 component_tvl: Some(tvl),
151 entrypoints: vec![],
152 }
153 }
154
155 async fn fetch_market_makers(&mut self) -> Result<Vec<String>, RFQError> {
156 let query_params = vec![
157 ("source", self.auth_user.clone()),
158 ("baseChainType", "evm".to_string()),
159 ("baseChainId", self.chain.id().to_string()),
160 ];
161
162 let http_client = Client::new();
163 let request = http_client
164 .get(&self.market_makers_endpoint)
165 .query(&query_params)
166 .header("accept", "application/json")
167 .header("Authorization", &self.auth_key);
168
169 let response = request.send().await.map_err(|e| {
170 RFQError::ConnectionError(format!("Failed to fetch market makers: {e}"))
171 })?;
172
173 if !response.status().is_success() {
174 return Err(RFQError::ConnectionError(format!(
175 "HTTP error {}: {}",
176 response.status(),
177 response
178 .text()
179 .await
180 .unwrap_or_default()
181 )));
182 }
183
184 let mm_response: HashflowMarketMakersResponse = response.json().await.map_err(|e| {
185 RFQError::ParsingError(format!("Failed to parse market makers response: {e}"))
186 })?;
187
188 info!(
189 "Fetched {} market makers: {:?}",
190 mm_response.market_makers.len(),
191 mm_response.market_makers
192 );
193
194 Ok(mm_response.market_makers)
195 }
196
197 async fn fetch_price_levels(
198 &self,
199 market_makers: &Vec<String>,
200 ) -> Result<HashMap<String, Vec<HashflowMarketMakerLevels>>, RFQError> {
201 let mut query_params = vec![
202 ("source", self.auth_user.clone()),
203 ("baseChainType", "evm".to_string()),
204 ("baseChainId", self.chain.id().to_string()),
205 ];
206
207 for mm in market_makers {
209 query_params.push(("marketMakers[]", mm.clone()));
210 }
211
212 let http_client = Client::new();
213 let request = http_client
214 .get(&self.price_levels_endpoint)
215 .query(&query_params)
216 .header("accept", "application/json")
217 .header("Authorization", &self.auth_key);
218
219 let response = request
220 .send()
221 .await
222 .map_err(|e| RFQError::ConnectionError(format!("Failed to fetch price levels: {e}")))?;
223
224 if !response.status().is_success() {
225 return Err(RFQError::ConnectionError(format!(
226 "HTTP error {}: {}",
227 response.status(),
228 response
229 .text()
230 .await
231 .unwrap_or_default()
232 )));
233 }
234
235 let price_response: HashflowPriceLevelsResponse = response.json().await.map_err(|e| {
236 RFQError::ParsingError(format!("Failed to parse price levels response: {e}"))
237 })?;
238
239 if price_response.status != "success" {
240 return Err(RFQError::InvalidInput(format!(
241 "API returned error status: {}",
242 price_response.error.unwrap_or_default()
243 )));
244 }
245
246 price_response
247 .levels
248 .ok_or_else(|| RFQError::ParsingError("API response missing levels".to_string()))
249 }
250}
251
252#[async_trait]
253impl RFQClient for HashflowClient {
254 fn stream(
255 &self,
256 ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
257 let mut client = self.clone();
258
259 Box::pin(async_stream::stream! {
260 let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
261 let mut ticker = interval(client.poll_time);
262
263 info!("Starting Hashflow price levels polling every {} seconds", client.poll_time.as_secs());
264 info!("TVL threshold: {:.2}", client.tvl);
265
266 loop {
267 ticker.tick().await;
268
269 let market_makers;
270 match client.fetch_market_makers().await {
271 Ok(mms) => {
272 market_makers = mms;
273 info!("Successfully fetched market makers");
274 }
275 Err(e) => {
276 info!("Failed to fetch market makers: {}", e);
277 continue;
278 }
279 }
280
281 match client.fetch_price_levels(&market_makers).await {
282 Ok(levels_by_mm) => {
283 let mut new_components = HashMap::new();
284
285 info!("Fetched price levels from {} market makers", levels_by_mm.len());
286 for (mm_name, mm_levels) in levels_by_mm.iter() {
288 for mm_level in mm_levels {
289 let base_token = &mm_level.pair.base_token;
290 let quote_token = &mm_level.pair.quote_token;
291
292 if client.tokens.contains(base_token) && client.tokens.contains(quote_token) {
294 let tokens = vec![base_token.clone(), quote_token.clone()];
295 let tvl = mm_level.calculate_tvl();
296
297 let normalized_tvl = client.normalize_tvl(
299 tvl,
300 mm_level.pair.quote_token.clone(),
301 &levels_by_mm,
302 )?;
303
304 let pair_str = format!("hashflow_{}/{}", hex::encode(base_token), hex::encode(quote_token));
306 let component_id = format!("{}", keccak256(pair_str.as_bytes()));
307
308 if normalized_tvl < client.tvl {
309 info!("Filtering out component {} due to low TVL: {:.2} < {:.2}",
310 component_id, normalized_tvl, client.tvl);
311 continue;
312 }
313
314 let component_with_state = client.create_component_with_state(
315 component_id.clone(),
316 tokens,
317 mm_name,
318 mm_level,
319 normalized_tvl
320 );
321 new_components.insert(component_id, component_with_state);
322 }
323 }
324 }
325
326 let removed_components: HashMap<String, ProtocolComponent> = current_components
328 .iter()
329 .filter(|&(id, _)| !new_components.contains_key(id))
330 .map(|(k, v)| (k.clone(), v.component.clone()))
331 .collect();
332
333 current_components = new_components.clone();
335
336 let snapshot = Snapshot {
337 states: new_components,
338 vm_storage: HashMap::new(),
339 };
340 let timestamp = SystemTime::now().duration_since(
341 SystemTime::UNIX_EPOCH
342 ).map_err(
343 |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
344 )?.as_secs();
345
346 let msg = StateSyncMessage::<TimestampHeader> {
347 header: TimestampHeader { timestamp },
348 snapshots: snapshot,
349 deltas: None,
350 removed_components,
351 };
352
353 yield Ok(("hashflow".to_string(), msg));
354 },
355 Err(e) => {
356 error!("Failed to fetch price levels from Hashflow API: {}", e);
357 continue;
358 }
359 }
360 }
361 })
362 }
363
364 async fn request_binding_quote(
365 &self,
366 params: &GetAmountOutParams,
367 ) -> Result<SignedQuote, RFQError> {
368 let hashflow_chain = HashflowChain::from(self.chain);
369 let quote_request = HashflowQuoteRequest {
370 source: self.auth_user.clone(),
371 base_chain: hashflow_chain.clone(),
372 quote_chain: hashflow_chain,
373 rfqs: vec![HashflowRFQ {
374 base_token: params.token_in.to_string(),
375 quote_token: params.token_out.to_string(),
376 base_token_amount: Some(params.amount_in.to_string()),
377 quote_token_amount: None,
378 trader: params.receiver.to_string(),
379 effective_trader: None,
380 }],
381 calldata: false,
382 };
383
384 let url = self.quote_endpoint.clone();
385
386 let start_time = std::time::Instant::now();
387 const MAX_RETRIES: u32 = 3;
388 let mut last_error = None;
389
390 for attempt in 0..MAX_RETRIES {
391 let elapsed = start_time.elapsed();
393 if elapsed >= self.quote_timeout {
394 return Err(last_error.unwrap_or_else(|| {
395 RFQError::ConnectionError(format!(
396 "Hashflow quote request timed out after {} seconds",
397 self.quote_timeout.as_secs()
398 ))
399 }));
400 }
401
402 let remaining_time = self.quote_timeout - elapsed;
403
404 let http_client = Client::new();
405 let request = http_client
406 .post(&url)
407 .json("e_request)
408 .header("accept", "application/json")
409 .header("Authorization", &self.auth_key);
410
411 let response = match timeout(remaining_time, request.send()).await {
412 Ok(Ok(resp)) => resp,
413 Ok(Err(e)) => {
414 warn!(
415 "Hashflow quote request failed (attempt {}/{}): {}",
416 attempt + 1,
417 MAX_RETRIES,
418 e
419 );
420 last_error = Some(RFQError::ConnectionError(format!(
421 "Failed to send Hashflow quote request: {e}"
422 )));
423 if attempt < MAX_RETRIES - 1 {
424 tokio::time::sleep(Duration::from_millis(100)).await;
425 continue;
426 } else {
427 return Err(last_error.unwrap());
428 }
429 }
430 Err(_) => {
431 return Err(RFQError::ConnectionError(format!(
432 "Hashflow quote request timed out after {} seconds",
433 self.quote_timeout.as_secs()
434 )));
435 }
436 };
437
438 if response.status() != 200 {
439 let err_msg = match response.text().await {
440 Ok(text) => text,
441 Err(e) => {
442 warn!(
443 "Hashflow error response parsing failed (attempt {}/{}): {}",
444 attempt + 1,
445 MAX_RETRIES,
446 e
447 );
448 last_error = Some(RFQError::ParsingError(format!(
449 "Failed to read response text from Hashflow failed request: {e}"
450 )));
451 if attempt < MAX_RETRIES - 1 {
452 tokio::time::sleep(Duration::from_millis(100)).await;
453 continue;
454 } else {
455 return Err(last_error.unwrap());
456 }
457 }
458 };
459 last_error = Some(RFQError::FatalError(format!(
460 "Failed to send Hashflow quote request: {err_msg}",
461 )));
462 if attempt < MAX_RETRIES - 1 {
463 warn!(
464 "Hashflow returned non-200 status (attempt {}/{}): {}",
465 attempt + 1,
466 MAX_RETRIES,
467 err_msg
468 );
469 tokio::time::sleep(Duration::from_millis(100)).await;
470 continue;
471 } else {
472 return Err(last_error.unwrap());
473 }
474 }
475
476 let quote_response = match response
477 .json::<HashflowQuoteResponse>()
478 .await
479 {
480 Ok(resp) => resp,
481 Err(e) => {
482 warn!(
483 "Hashflow quote response parsing failed (attempt {}/{}): {}",
484 attempt + 1,
485 MAX_RETRIES,
486 e
487 );
488 last_error = Some(RFQError::ParsingError(format!(
489 "Failed to parse Hashflow quote response: {e}"
490 )));
491 if attempt < MAX_RETRIES - 1 {
492 tokio::time::sleep(Duration::from_millis(100)).await;
493 continue;
494 } else {
495 return Err(last_error.unwrap());
496 }
497 }
498 };
499
500 match quote_response.status.as_str() {
501 "success" => {
502 if let Some(quotes) = quote_response.quotes {
503 if quotes.is_empty() {
504 return Err(RFQError::QuoteNotFound(format!(
505 "Hashflow quote not found for {} {} ->{}",
506 params.amount_in, params.token_in, params.token_out,
507 )))
508 }
509 let quote = quotes[0].clone();
511 quote.validate(params)?;
512
513 let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
514 quote_attributes.insert("pool".to_string(), quote.quote_data.pool);
515 if let Some(external_account) = quote.quote_data.external_account {
516 quote_attributes
517 .insert("external_account".to_string(), external_account);
518 } else {
519 quote_attributes.insert(
520 "external_account".to_string(),
521 Bytes::from_str(&Address::ZERO.to_string()).map_err(|_| {
522 RFQError::ParsingError(
523 "Failed to parse zero address".to_string(),
524 )
525 })?,
526 );
527 }
528 quote_attributes.insert("trader".to_string(), quote.quote_data.trader);
529 quote_attributes
530 .insert("base_token".to_string(), quote.quote_data.base_token);
531 quote_attributes
532 .insert("quote_token".to_string(), quote.quote_data.quote_token);
533 quote_attributes.insert(
534 "base_token_amount".to_string(),
535 Bytes::from(
536 biguint_to_u256(
537 &BigUint::from_str("e.quote_data.base_token_amount)
538 .map_err(|_| {
539 RFQError::ParsingError(format!(
540 "Failed to parse base token amount: {}",
541 quote.quote_data.base_token_amount
542 ))
543 })?,
544 )
545 .to_be_bytes::<32>()
546 .to_vec(),
547 ),
548 );
549 quote_attributes.insert(
550 "quote_token_amount".to_string(),
551 Bytes::from(
552 biguint_to_u256(
553 &BigUint::from_str("e.quote_data.quote_token_amount)
554 .map_err(|_| {
555 RFQError::ParsingError(format!(
556 "Failed to parse quote token amount: {}",
557 quote.quote_data.quote_token_amount
558 ))
559 })?,
560 )
561 .to_be_bytes::<32>()
562 .to_vec(),
563 ),
564 );
565 quote_attributes.insert(
566 "quote_expiry".to_string(),
567 Bytes::from(
568 U256::from(quote.quote_data.quote_expiry)
569 .to_be_bytes::<32>()
570 .to_vec(),
571 ),
572 );
573 quote_attributes.insert(
574 "nonce".to_string(),
575 Bytes::from(
576 U256::from(quote.quote_data.nonce)
577 .to_be_bytes::<32>()
578 .to_vec(),
579 ),
580 );
581 quote_attributes.insert("tx_id".to_string(), quote.quote_data.tx_id);
582 quote_attributes.insert("signature".to_string(), quote.signature);
583
584 let signed_quote = SignedQuote {
585 base_token: params.token_in.clone(),
586 quote_token: params.token_out.clone(),
587 amount_in: BigUint::from_str("e.quote_data.base_token_amount)
588 .map_err(|_| {
589 RFQError::ParsingError(format!(
590 "Failed to parse amount in string: {}",
591 quote.quote_data.base_token_amount
592 ))
593 })?,
594 amount_out: BigUint::from_str("e.quote_data.quote_token_amount)
595 .map_err(|_| {
596 RFQError::ParsingError(format!(
597 "Failed to parse amount out string: {}",
598 quote.quote_data.quote_token_amount
599 ))
600 })?,
601 quote_attributes,
602 };
603 return Ok(signed_quote);
604 } else {
605 return Err(RFQError::QuoteNotFound(format!(
606 "Hashflow quote not found for {} {} ->{}",
607 params.amount_in, params.token_in, params.token_out,
608 )));
609 }
610 }
611 "fail" => {
612 return Err(RFQError::FatalError(format!(
613 "Hashflow API error: {:?}",
614 quote_response.error
615 )));
616 }
617 _ => {
618 return Err(RFQError::FatalError(
619 "Hashflow API error: Unknown status".to_string(),
620 ));
621 }
622 }
623 }
624
625 Err(last_error.unwrap_or_else(|| {
626 RFQError::ConnectionError("Hashflow quote request failed after retries".to_string())
627 }))
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use std::{env, str::FromStr, time::Duration};
634
635 use dotenv::dotenv;
636 use futures::StreamExt;
637 use tokio::time::timeout;
638
639 use super::*;
640 use crate::rfq::{
641 constants::get_hashflow_auth,
642 protocols::hashflow::models::{HashflowPair, HashflowPriceLevel},
643 };
644
645 #[test]
646 fn test_normalize_tvl_same_quote_token() {
647 let client = create_test_client();
648 let levels = HashMap::new();
649
650 let result = client.normalize_tvl(
652 1000.0,
653 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
654 &levels,
655 );
656 assert!(result.is_ok());
657 assert_eq!(result.unwrap(), 1000.0);
658 }
659
660 #[test]
661 fn test_normalize_tvl_different_quote_token() {
662 let client = create_test_client();
663 let mut levels = HashMap::new();
664 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
665 let usdc = Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
666
667 let eth_usdc_level = HashflowMarketMakerLevels {
669 pair: HashflowPair { base_token: weth.clone(), quote_token: usdc },
670 levels: vec![
671 HashflowPriceLevel { quantity: 1.0, price: 3000.0 }, ],
673 };
674
675 levels.insert("test_mm".to_string(), vec![eth_usdc_level]);
676
677 let result = client.normalize_tvl(2.0, weth, &levels);
679 assert!(result.is_ok());
680 assert_eq!(result.unwrap(), 6000.0);
682 }
683
684 #[test]
685 fn test_normalize_tvl_no_conversion_available() {
686 let client = create_test_client();
687 let levels = HashMap::new();
688 let result = client.normalize_tvl(
689 1000.0,
690 Bytes::from_str("0x1234567890123456789012345678901234567890").unwrap(),
691 &levels,
692 );
693 assert!(result.is_ok());
694 assert_eq!(result.unwrap(), 0.0);
695 }
696
697 fn create_test_client() -> HashflowClient {
698 let quote_tokens = HashSet::from([
699 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(), ]);
702
703 HashflowClient::new(
704 Chain::Ethereum,
705 HashSet::new(),
706 1.0,
707 quote_tokens,
708 "test_user".to_string(),
709 "test_key".to_string(),
710 Duration::from_secs(5),
711 Duration::from_secs(5),
712 )
713 .unwrap()
714 }
715
716 #[tokio::test]
717 #[ignore] async fn test_hashflow_api_polling() {
719 dotenv().expect("Missing .env file");
720 let auth = get_hashflow_auth().unwrap();
721
722 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
723 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
724
725 let tokens = HashSet::from([wbtc, weth.clone()]);
726
727 let quote_tokens = HashSet::from([
728 Bytes::from_str("0xa0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), ]);
731
732 let client = HashflowClient::new(
733 Chain::Ethereum,
734 tokens,
735 1.0, quote_tokens,
737 auth.user,
738 auth.key,
739 Duration::from_secs(1),
740 Duration::from_secs(5),
741 )
742 .unwrap();
743
744 let mut stream = client.stream();
745
746 let result = timeout(Duration::from_secs(10), async {
747 let mut message_count = 0;
748 let max_messages = 3;
749 let mut total_components_received = 0;
750
751 while let Some(result) = stream.next().await {
752 match result {
753 Ok((component_id, msg)) => {
754 println!("Received message with ID: {component_id}");
755
756 assert!(!component_id.is_empty());
757 assert_eq!(component_id, "hashflow");
758 assert!(msg.header.timestamp > 0);
759
760 let snapshot = &msg.snapshots;
761 total_components_received += snapshot.states.len();
762
763 println!("Received {} components in this message (Total so far: {})",
764 snapshot.states.len(), total_components_received);
765
766 for (id, component_with_state) in &snapshot.states {
767 let attributes = &component_with_state.state.attributes;
768 let levels: &Bytes = attributes.get("levels").unwrap();
769 if attributes.contains_key("levels") {
771 println!("{levels:?}");
772 assert!(!attributes["levels"].is_empty());
773 }
774 if attributes.contains_key("mm") {
776 assert!(!attributes["mm"].is_empty());
777 }
778
779 if let Some(tvl) = component_with_state.component_tvl {
780 assert!(tvl >= 1.0);
781 println!("Component {id} TVL: ${tvl:.2}");
782 }
783 }
784
785 message_count += 1;
786 if message_count >= max_messages {
787 break;
788 }
789 }
790 Err(e) => {
791 panic!("Stream error: {e}");
792 }
793 }
794 }
795
796 assert!(message_count > 0, "Should have received at least one message");
797 assert!(total_components_received >= 1, "Should have received at least 1 component with $1 TVL threshold");
798 println!("Successfully received {message_count} messages with {total_components_received} total components");
799 })
800 .await;
801
802 match result {
803 Ok(_) => println!("Test completed successfully"),
804 Err(_) => panic!("Test timed out - no messages received within 5 seconds"),
805 }
806 }
807
808 #[tokio::test]
809 #[ignore] async fn test_request_binding_quote() {
811 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
812 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
813
814 let auth_user = String::from("propellerheads");
815 dotenv().expect("Missing .env file");
816 let auth_key = env::var("HASHFLOW_KEY").unwrap();
817
818 let client = HashflowClient::new(
819 Chain::Ethereum,
820 HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
821 10.0,
822 HashSet::new(),
823 auth_user,
824 auth_key,
825 Duration::from_secs(0),
826 Duration::from_secs(5),
827 )
828 .unwrap();
829
830 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
831
832 let params = GetAmountOutParams {
833 amount_in: BigUint::from(1_000000000000000000u64),
834 token_in: weth.clone(),
835 token_out: wbtc.clone(),
836 sender: router.clone(),
837 receiver: router.clone(),
838 };
839 let quote = client
840 .request_binding_quote(¶ms)
841 .await
842 .unwrap();
843
844 assert_eq!(quote.base_token, weth);
845 assert_eq!(quote.quote_token, wbtc);
846 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
847
848 assert!(quote.amount_out > BigUint::from(3000000u64));
850
851 assert_eq!(quote.quote_attributes.len(), 11);
852 let expected_attributes = [
853 "pool",
854 "external_account",
855 "trader",
856 "base_token",
857 "quote_token",
858 "base_token_amount",
859 "quote_token_amount",
860 "quote_expiry",
861 "nonce",
862 "tx_id",
863 "signature",
864 ];
865 for attr in expected_attributes {
866 assert!(
867 quote
868 .quote_attributes
869 .contains_key(attr),
870 "Missing attribute: {attr}"
871 );
872 }
873 assert_eq!(
874 quote
875 .quote_attributes
876 .get("trader")
877 .unwrap(),
878 &router
879 );
880 }
881
882 async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
884 use tokio::{io::AsyncWriteExt, net::TcpListener};
885
886 let listener = TcpListener::bind("127.0.0.1:0")
887 .await
888 .unwrap();
889 let addr = listener.local_addr().unwrap();
890
891 let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
892
893 tokio::spawn(async move {
894 while let Ok((mut stream, _)) = listener.accept().await {
895 let json_response_clone = json_response.to_owned();
896 tokio::spawn(async move {
897 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
898 let response = format!(
899 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
900 json_response_clone.len(),
901 json_response_clone
902 );
903 let _ = stream
904 .write_all(response.as_bytes())
905 .await;
906 let _ = stream.flush().await;
907 let _ = stream.shutdown().await;
908 });
909 }
910 });
911
912 tokio::time::sleep(Duration::from_millis(50)).await;
913 addr
914 }
915
916 fn create_test_hashflow_client(
917 quote_endpoint: String,
918 quote_timeout: Duration,
919 ) -> HashflowClient {
920 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
921 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
922
923 HashflowClient {
924 chain: Chain::Ethereum,
925 price_levels_endpoint: "http://unused/price-levels".to_string(),
926 market_makers_endpoint: "http://unused/market-makers".to_string(),
927 quote_endpoint,
928 tokens: HashSet::from([token_in, token_out]),
929 tvl: 10.0,
930 auth_key: "test_key".to_string(),
931 auth_user: "test_user".to_string(),
932 quote_tokens: HashSet::new(),
933 poll_time: Duration::from_secs(0),
934 quote_timeout,
935 }
936 }
937
938 fn create_test_quote_params() -> GetAmountOutParams {
940 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
941 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
942 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
943
944 GetAmountOutParams {
945 amount_in: BigUint::from(1_000000000000000000u64),
946 token_in,
947 token_out,
948 sender: router.clone(),
949 receiver: router,
950 }
951 }
952
953 #[tokio::test]
954 async fn test_hashflow_quote_timeout() {
955 let addr = create_delayed_response_server(500).await;
956
957 let client_short_timeout = create_test_hashflow_client(
959 format!("http://127.0.0.1:{}/rfq", addr.port()),
960 Duration::from_millis(200),
961 );
962 let params = create_test_quote_params();
963
964 let start = std::time::Instant::now();
966 let result = client_short_timeout
967 .request_binding_quote(¶ms)
968 .await;
969 let elapsed = start.elapsed();
970
971 assert!(result.is_err());
973 let err = result.unwrap_err();
974 match err {
975 RFQError::ConnectionError(msg) => {
976 assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
977 }
978 _ => panic!("Expected ConnectionError, got: {:?}", err),
979 }
980 assert!(
982 elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
983 "Expected timeout around 200ms, got: {:?}",
984 elapsed
985 );
986
987 let client_long_timeout = create_test_hashflow_client(
991 format!("http://127.0.0.1:{}/rfq", addr.port()),
992 Duration::from_secs(1),
993 );
994
995 let result = client_long_timeout
997 .request_binding_quote(¶ms)
998 .await;
999
1000 assert!(result.is_ok(), "Expected success, got: {:?}", result);
1002 }
1003
1004 async fn create_retry_server() -> (std::net::SocketAddr, std::sync::Arc<std::sync::Mutex<u32>>)
1006 {
1007 use std::sync::{Arc, Mutex};
1008
1009 use tokio::{io::AsyncWriteExt, net::TcpListener};
1010
1011 let request_count = Arc::new(Mutex::new(0u32));
1012 let request_count_clone = request_count.clone();
1013
1014 let listener = TcpListener::bind("127.0.0.1:0")
1015 .await
1016 .unwrap();
1017 let addr = listener.local_addr().unwrap();
1018
1019 let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
1020
1021 tokio::spawn(async move {
1022 while let Ok((mut stream, _)) = listener.accept().await {
1023 let count_clone = request_count_clone.clone();
1024 let json_response_clone = json_response.to_owned();
1025 tokio::spawn(async move {
1026 *count_clone.lock().unwrap() += 1;
1027 let count = *count_clone.lock().unwrap();
1028 println!("Mock server: Received request #{count}");
1029
1030 if count <= 2 {
1031 let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1032 let _ = stream
1033 .write_all(response.as_bytes())
1034 .await;
1035 } else {
1036 let response = format!(
1037 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1038 json_response_clone.len(),
1039 json_response_clone
1040 );
1041 let _ = stream
1042 .write_all(response.as_bytes())
1043 .await;
1044 }
1045 let _ = stream.flush().await;
1046 let _ = stream.shutdown().await;
1047 });
1048 }
1049 });
1050
1051 tokio::time::sleep(Duration::from_millis(50)).await;
1052 (addr, request_count)
1053 }
1054
1055 #[tokio::test]
1056 async fn test_hashflow_quote_retry_on_bad_response() {
1057 let (addr, request_count) = create_retry_server().await;
1058
1059 let client = create_test_hashflow_client(
1060 format!("http://127.0.0.1:{}/rfq", addr.port()),
1061 Duration::from_secs(5),
1062 );
1063 let params = create_test_quote_params();
1064 let result = client
1065 .request_binding_quote(¶ms)
1066 .await;
1067
1068 assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1069 let quote = result.unwrap();
1070
1071 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
1073 assert_eq!(quote.amount_out, BigUint::from(3329502u64));
1074
1075 let final_count = *request_count.lock().unwrap();
1077 assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1078 }
1079}