1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address, U256};
8use async_trait::async_trait;
9use futures::stream::BoxStream;
10use num_bigint::BigUint;
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use tokio::time::{interval, timeout, Duration};
14use tracing::{error, info, warn};
15use tycho_common::{
16 models::{protocol::GetAmountOutParams, Chain},
17 simulation::indicatively_priced::SignedQuote,
18 Bytes,
19};
20
21use crate::{
22 evm::protocol::u256_num::biguint_to_u256,
23 rfq::{
24 client::RFQClient,
25 errors::RFQError,
26 models::TimestampHeader,
27 protocols::hashflow::models::{
28 HashflowChain, HashflowMarketMakerLevels, HashflowMarketMakersResponse,
29 HashflowPriceLevelsResponse, HashflowQuoteRequest, HashflowQuoteResponse, HashflowRFQ,
30 },
31 },
32 tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
33 tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
34};
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub struct HashflowClient {
38 chain: Chain,
39 price_levels_endpoint: String,
40 market_makers_endpoint: String,
41 quote_endpoint: String,
42 tokens: HashSet<Bytes>,
44 tvl: f64,
46 #[serde(skip_serializing, default)]
47 auth_key: String,
48 #[serde(skip_serializing, default)]
49 auth_user: String,
50 quote_tokens: HashSet<Bytes>,
52 poll_time: Duration,
53 quote_timeout: Duration,
54}
55
56impl HashflowClient {
57 pub const PROTOCOL_SYSTEM: &'static str = "rfq:hashflow";
58
59 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 chain: Chain,
62 tokens: HashSet<Bytes>,
63 tvl: f64,
64 quote_tokens: HashSet<Bytes>,
65 auth_user: String,
66 auth_key: String,
67 poll_time: Duration,
68 quote_timeout: Duration,
69 ) -> Result<Self, RFQError> {
70 Ok(Self {
71 chain,
72 price_levels_endpoint: "https://api.hashflow.com/taker/v3/price-levels".to_string(),
73 market_makers_endpoint: "https://api.hashflow.com/taker/v3/market-makers".to_string(),
74 quote_endpoint: "https://api.hashflow.com/taker/v3/rfq".to_string(),
75 tokens,
76 tvl,
77 auth_key,
78 auth_user,
79 quote_tokens,
80 poll_time,
81 quote_timeout,
82 })
83 }
84
85 fn normalize_tvl(
88 &self,
89 raw_tvl: f64,
90 quote_token: Bytes,
91 levels_by_mm: &HashMap<String, Vec<HashflowMarketMakerLevels>>,
92 ) -> Result<f64, RFQError> {
93 if self.quote_tokens.contains("e_token) {
95 return Ok(raw_tvl);
96 }
97
98 for approved_quote_token in &self.quote_tokens {
101 for (_mm, mm_levels_inner) in levels_by_mm.iter() {
102 for quote_mm_level in mm_levels_inner {
103 if quote_mm_level.pair.base_token == quote_token &&
105 quote_mm_level.pair.quote_token == *approved_quote_token
106 {
107 if let Some(price) = quote_mm_level.get_price(1.0) {
108 return Ok(raw_tvl * price);
109 }
110 }
111 }
112 }
113 }
114
115 Ok(0.0)
117 }
118
119 fn create_component_with_state(
120 &self,
121 component_id: String,
122 tokens: Vec<Bytes>,
123 mm_name: &str,
124 mm_level: &HashflowMarketMakerLevels,
125 tvl: f64,
126 ) -> ComponentWithState {
127 let protocol_component = ProtocolComponent {
128 id: component_id.clone(),
129 protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
130 protocol_type_name: "hashflow_pool".to_string(),
131 chain: self.chain.into(),
132 tokens,
133 contract_ids: vec![], ..Default::default()
135 };
136
137 let mut attributes = HashMap::new();
138
139 if !mm_level.levels.is_empty() {
141 let levels_json = serde_json::to_string(&mm_level.levels).unwrap_or_default();
142 attributes.insert("levels".to_string(), levels_json.as_bytes().to_vec().into());
143 }
144 attributes.insert("mm".to_string(), mm_name.as_bytes().to_vec().into());
145
146 ComponentWithState {
147 state: ResponseProtocolState {
148 component_id: component_id.clone(),
149 attributes,
150 balances: HashMap::new(),
151 },
152 component: protocol_component,
153 component_tvl: Some(tvl),
154 entrypoints: vec![],
155 }
156 }
157
158 async fn fetch_market_makers(&mut self) -> Result<Vec<String>, RFQError> {
159 let query_params = vec![
160 ("source", self.auth_user.clone()),
161 ("baseChainType", "evm".to_string()),
162 ("baseChainId", self.chain.id().to_string()),
163 ];
164
165 let http_client = Client::new();
166 let request = http_client
167 .get(&self.market_makers_endpoint)
168 .query(&query_params)
169 .header("accept", "application/json")
170 .header("Authorization", &self.auth_key);
171
172 let response = request.send().await.map_err(|e| {
173 RFQError::ConnectionError(format!("Failed to fetch market makers: {e}"))
174 })?;
175
176 if !response.status().is_success() {
177 return Err(RFQError::ConnectionError(format!(
178 "HTTP error {}: {}",
179 response.status(),
180 response
181 .text()
182 .await
183 .unwrap_or_default()
184 )));
185 }
186
187 let mm_response: HashflowMarketMakersResponse = response.json().await.map_err(|e| {
188 RFQError::ParsingError(format!("Failed to parse market makers response: {e}"))
189 })?;
190
191 info!(
192 "Fetched {} market makers: {:?}",
193 mm_response.market_makers.len(),
194 mm_response.market_makers
195 );
196
197 Ok(mm_response.market_makers)
198 }
199
200 async fn fetch_price_levels(
201 &self,
202 market_makers: &Vec<String>,
203 ) -> Result<HashMap<String, Vec<HashflowMarketMakerLevels>>, RFQError> {
204 let mut query_params = vec![
205 ("source", self.auth_user.clone()),
206 ("baseChainType", "evm".to_string()),
207 ("baseChainId", self.chain.id().to_string()),
208 ];
209
210 for mm in market_makers {
212 query_params.push(("marketMakers[]", mm.clone()));
213 }
214
215 let http_client = Client::new();
216 let request = http_client
217 .get(&self.price_levels_endpoint)
218 .query(&query_params)
219 .header("accept", "application/json")
220 .header("Authorization", &self.auth_key);
221
222 let response = request
223 .send()
224 .await
225 .map_err(|e| RFQError::ConnectionError(format!("Failed to fetch price levels: {e}")))?;
226
227 if !response.status().is_success() {
228 return Err(RFQError::ConnectionError(format!(
229 "HTTP error {}: {}",
230 response.status(),
231 response
232 .text()
233 .await
234 .unwrap_or_default()
235 )));
236 }
237
238 let price_response: HashflowPriceLevelsResponse = response.json().await.map_err(|e| {
239 RFQError::ParsingError(format!("Failed to parse price levels response: {e}"))
240 })?;
241
242 if price_response.status != "success" {
243 return Err(RFQError::InvalidInput(format!(
244 "API returned error status: {}",
245 price_response.error.unwrap_or_default()
246 )));
247 }
248
249 price_response
250 .levels
251 .ok_or_else(|| RFQError::ParsingError("API response missing levels".to_string()))
252 }
253}
254
255#[async_trait]
256impl RFQClient for HashflowClient {
257 fn stream(
258 &self,
259 ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
260 let mut client = self.clone();
261
262 Box::pin(async_stream::stream! {
263 let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
264 let mut ticker = interval(client.poll_time);
265
266 info!("Starting Hashflow price levels polling every {} seconds", client.poll_time.as_secs());
267 info!("TVL threshold: {:.2}", client.tvl);
268
269 loop {
270 ticker.tick().await;
271
272 let market_makers;
273 match client.fetch_market_makers().await {
274 Ok(mms) => {
275 market_makers = mms;
276 info!("Successfully fetched market makers");
277 }
278 Err(e) => {
279 info!("Failed to fetch market makers: {}", e);
280 continue;
281 }
282 }
283
284 match client.fetch_price_levels(&market_makers).await {
285 Ok(levels_by_mm) => {
286 let mut new_components = HashMap::new();
287
288 info!("Fetched price levels from {} market makers", levels_by_mm.len());
289 for (mm_name, mm_levels) in levels_by_mm.iter() {
291 for mm_level in mm_levels {
292 let base_token = &mm_level.pair.base_token;
293 let quote_token = &mm_level.pair.quote_token;
294
295 if client.tokens.contains(base_token) && client.tokens.contains(quote_token) {
297 let tokens = vec![base_token.clone(), quote_token.clone()];
298 let tvl = mm_level.calculate_tvl();
299
300 let normalized_tvl = client.normalize_tvl(
302 tvl,
303 mm_level.pair.quote_token.clone(),
304 &levels_by_mm,
305 )?;
306
307 let pair_str = format!("hashflow_{}/{}", hex::encode(base_token), hex::encode(quote_token));
309 let component_id = format!("{}", keccak256(pair_str.as_bytes()));
310
311 if normalized_tvl < client.tvl {
312 info!("Filtering out component {} due to low TVL: {:.2} < {:.2}",
313 component_id, normalized_tvl, client.tvl);
314 continue;
315 }
316
317 let component_with_state = client.create_component_with_state(
318 component_id.clone(),
319 tokens,
320 mm_name,
321 mm_level,
322 normalized_tvl
323 );
324 new_components.insert(component_id, component_with_state);
325 }
326 }
327 }
328
329 let removed_components: HashMap<String, ProtocolComponent> = current_components
331 .iter()
332 .filter(|&(id, _)| !new_components.contains_key(id))
333 .map(|(k, v)| (k.clone(), v.component.clone()))
334 .collect();
335
336 current_components = new_components.clone();
338
339 let snapshot = Snapshot {
340 states: new_components,
341 vm_storage: HashMap::new(),
342 };
343 let timestamp = SystemTime::now().duration_since(
344 SystemTime::UNIX_EPOCH
345 ).map_err(
346 |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
347 )?.as_secs();
348
349 let msg = StateSyncMessage::<TimestampHeader> {
350 header: TimestampHeader { timestamp },
351 snapshots: snapshot,
352 deltas: None,
353 removed_components,
354 };
355
356 yield Ok(("hashflow".to_string(), msg));
357 },
358 Err(e) => {
359 error!("Failed to fetch price levels from Hashflow API: {}", e);
360 continue;
361 }
362 }
363 }
364 })
365 }
366
367 async fn request_binding_quote(
368 &self,
369 params: &GetAmountOutParams,
370 ) -> Result<SignedQuote, RFQError> {
371 let hashflow_chain = HashflowChain::from(self.chain);
372 let quote_request = HashflowQuoteRequest {
373 source: self.auth_user.clone(),
374 base_chain: hashflow_chain.clone(),
375 quote_chain: hashflow_chain,
376 rfqs: vec![HashflowRFQ {
377 base_token: params.token_in.to_string(),
378 quote_token: params.token_out.to_string(),
379 base_token_amount: Some(params.amount_in.to_string()),
380 quote_token_amount: None,
381 trader: params.receiver.to_string(),
382 effective_trader: None,
383 }],
384 calldata: false,
385 };
386
387 let url = self.quote_endpoint.clone();
388
389 let start_time = std::time::Instant::now();
390 const MAX_RETRIES: u32 = 3;
391 let mut last_error = None;
392
393 for attempt in 0..MAX_RETRIES {
394 let elapsed = start_time.elapsed();
396 if elapsed >= self.quote_timeout {
397 return Err(last_error.unwrap_or_else(|| {
398 RFQError::ConnectionError(format!(
399 "Hashflow quote request timed out after {} seconds",
400 self.quote_timeout.as_secs()
401 ))
402 }));
403 }
404
405 let remaining_time = self.quote_timeout - elapsed;
406
407 let http_client = Client::new();
408 let request = http_client
409 .post(&url)
410 .json("e_request)
411 .header("accept", "application/json")
412 .header("Authorization", &self.auth_key);
413
414 let response = match timeout(remaining_time, request.send()).await {
415 Ok(Ok(resp)) => resp,
416 Ok(Err(e)) => {
417 warn!(
418 "Hashflow quote request failed (attempt {}/{}): {}",
419 attempt + 1,
420 MAX_RETRIES,
421 e
422 );
423 last_error = Some(RFQError::ConnectionError(format!(
424 "Failed to send Hashflow quote request: {e}"
425 )));
426 if attempt < MAX_RETRIES - 1 {
427 tokio::time::sleep(Duration::from_millis(100)).await;
428 continue;
429 } else {
430 return Err(last_error.unwrap());
431 }
432 }
433 Err(_) => {
434 return Err(RFQError::ConnectionError(format!(
435 "Hashflow quote request timed out after {} seconds",
436 self.quote_timeout.as_secs()
437 )));
438 }
439 };
440
441 if response.status() != 200 {
442 let err_msg = match response.text().await {
443 Ok(text) => text,
444 Err(e) => {
445 warn!(
446 "Hashflow error response parsing failed (attempt {}/{}): {}",
447 attempt + 1,
448 MAX_RETRIES,
449 e
450 );
451 last_error = Some(RFQError::ParsingError(format!(
452 "Failed to read response text from Hashflow failed request: {e}"
453 )));
454 if attempt < MAX_RETRIES - 1 {
455 tokio::time::sleep(Duration::from_millis(100)).await;
456 continue;
457 } else {
458 return Err(last_error.unwrap());
459 }
460 }
461 };
462 last_error = Some(RFQError::FatalError(format!(
463 "Failed to send Hashflow quote request: {err_msg}",
464 )));
465 if attempt < MAX_RETRIES - 1 {
466 warn!(
467 "Hashflow returned non-200 status (attempt {}/{}): {}",
468 attempt + 1,
469 MAX_RETRIES,
470 err_msg
471 );
472 tokio::time::sleep(Duration::from_millis(100)).await;
473 continue;
474 } else {
475 return Err(last_error.unwrap());
476 }
477 }
478
479 let quote_response = match response
480 .json::<HashflowQuoteResponse>()
481 .await
482 {
483 Ok(resp) => resp,
484 Err(e) => {
485 warn!(
486 "Hashflow quote response parsing failed (attempt {}/{}): {}",
487 attempt + 1,
488 MAX_RETRIES,
489 e
490 );
491 last_error = Some(RFQError::ParsingError(format!(
492 "Failed to parse Hashflow quote response: {e}"
493 )));
494 if attempt < MAX_RETRIES - 1 {
495 tokio::time::sleep(Duration::from_millis(100)).await;
496 continue;
497 } else {
498 return Err(last_error.unwrap());
499 }
500 }
501 };
502
503 match quote_response.status.as_str() {
504 "success" => {
505 if let Some(quotes) = quote_response.quotes {
506 if quotes.is_empty() {
507 return Err(RFQError::QuoteNotFound(format!(
508 "Hashflow quote not found for {} {} ->{}",
509 params.amount_in, params.token_in, params.token_out,
510 )));
511 }
512 let quote = quotes[0].clone();
514 quote.validate(params)?;
515
516 let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
517 quote_attributes.insert("pool".to_string(), quote.quote_data.pool);
518 if let Some(external_account) = quote.quote_data.external_account {
519 quote_attributes
520 .insert("external_account".to_string(), external_account);
521 } else {
522 quote_attributes.insert(
523 "external_account".to_string(),
524 Bytes::from_str(&Address::ZERO.to_string()).map_err(|_| {
525 RFQError::ParsingError(
526 "Failed to parse zero address".to_string(),
527 )
528 })?,
529 );
530 }
531 quote_attributes.insert("trader".to_string(), quote.quote_data.trader);
532 quote_attributes
533 .insert("base_token".to_string(), quote.quote_data.base_token);
534 quote_attributes
535 .insert("quote_token".to_string(), quote.quote_data.quote_token);
536 quote_attributes.insert(
537 "base_token_amount".to_string(),
538 Bytes::from(
539 biguint_to_u256(
540 &BigUint::from_str("e.quote_data.base_token_amount)
541 .map_err(|_| {
542 RFQError::ParsingError(format!(
543 "Failed to parse base token amount: {}",
544 quote.quote_data.base_token_amount
545 ))
546 })?,
547 )
548 .to_be_bytes::<32>()
549 .to_vec(),
550 ),
551 );
552 quote_attributes.insert(
553 "quote_token_amount".to_string(),
554 Bytes::from(
555 biguint_to_u256(
556 &BigUint::from_str("e.quote_data.quote_token_amount)
557 .map_err(|_| {
558 RFQError::ParsingError(format!(
559 "Failed to parse quote token amount: {}",
560 quote.quote_data.quote_token_amount
561 ))
562 })?,
563 )
564 .to_be_bytes::<32>()
565 .to_vec(),
566 ),
567 );
568 quote_attributes.insert(
569 "quote_expiry".to_string(),
570 Bytes::from(
571 U256::from(quote.quote_data.quote_expiry)
572 .to_be_bytes::<32>()
573 .to_vec(),
574 ),
575 );
576 quote_attributes.insert(
577 "nonce".to_string(),
578 Bytes::from(
579 U256::from(quote.quote_data.nonce)
580 .to_be_bytes::<32>()
581 .to_vec(),
582 ),
583 );
584 quote_attributes.insert("tx_id".to_string(), quote.quote_data.tx_id);
585 quote_attributes.insert("signature".to_string(), quote.signature);
586
587 let signed_quote = SignedQuote {
588 base_token: params.token_in.clone(),
589 quote_token: params.token_out.clone(),
590 amount_in: BigUint::from_str("e.quote_data.base_token_amount)
591 .map_err(|_| {
592 RFQError::ParsingError(format!(
593 "Failed to parse amount in string: {}",
594 quote.quote_data.base_token_amount
595 ))
596 })?,
597 amount_out: BigUint::from_str("e.quote_data.quote_token_amount)
598 .map_err(|_| {
599 RFQError::ParsingError(format!(
600 "Failed to parse amount out string: {}",
601 quote.quote_data.quote_token_amount
602 ))
603 })?,
604 quote_attributes,
605 };
606 return Ok(signed_quote);
607 } else {
608 return Err(RFQError::QuoteNotFound(format!(
609 "Hashflow quote not found for {} {} ->{}",
610 params.amount_in, params.token_in, params.token_out,
611 )));
612 }
613 }
614 "fail" => {
615 return Err(RFQError::FatalError(format!(
616 "Hashflow API error: {:?}",
617 quote_response.error
618 )));
619 }
620 _ => {
621 return Err(RFQError::FatalError(
622 "Hashflow API error: Unknown status".to_string(),
623 ));
624 }
625 }
626 }
627
628 Err(last_error.unwrap_or_else(|| {
629 RFQError::ConnectionError("Hashflow quote request failed after retries".to_string())
630 }))
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use std::{env, str::FromStr, time::Duration};
637
638 use dotenv::dotenv;
639 use futures::StreamExt;
640 use tokio::time::timeout;
641
642 use super::*;
643 use crate::rfq::{
644 constants::get_hashflow_auth,
645 protocols::hashflow::models::{HashflowPair, HashflowPriceLevel},
646 };
647
648 #[test]
649 fn test_normalize_tvl_same_quote_token() {
650 let client = create_test_client();
651 let levels = HashMap::new();
652
653 let result = client.normalize_tvl(
655 1000.0,
656 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
657 &levels,
658 );
659 assert!(result.is_ok());
660 assert_eq!(result.unwrap(), 1000.0);
661 }
662
663 #[test]
664 fn test_normalize_tvl_different_quote_token() {
665 let client = create_test_client();
666 let mut levels = HashMap::new();
667 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
668 let usdc = Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
669
670 let eth_usdc_level = HashflowMarketMakerLevels {
672 pair: HashflowPair { base_token: weth.clone(), quote_token: usdc },
673 levels: vec![
674 HashflowPriceLevel { quantity: 1.0, price: 3000.0 }, ],
676 };
677
678 levels.insert("test_mm".to_string(), vec![eth_usdc_level]);
679
680 let result = client.normalize_tvl(2.0, weth, &levels);
682 assert!(result.is_ok());
683 assert_eq!(result.unwrap(), 6000.0);
685 }
686
687 #[test]
688 fn test_normalize_tvl_no_conversion_available() {
689 let client = create_test_client();
690 let levels = HashMap::new();
691 let result = client.normalize_tvl(
692 1000.0,
693 Bytes::from_str("0x1234567890123456789012345678901234567890").unwrap(),
694 &levels,
695 );
696 assert!(result.is_ok());
697 assert_eq!(result.unwrap(), 0.0);
698 }
699
700 fn create_test_client() -> HashflowClient {
701 let quote_tokens = HashSet::from([
702 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(), ]);
705
706 HashflowClient::new(
707 Chain::Ethereum,
708 HashSet::new(),
709 1.0,
710 quote_tokens,
711 "test_user".to_string(),
712 "test_key".to_string(),
713 Duration::from_secs(5),
714 Duration::from_secs(5),
715 )
716 .unwrap()
717 }
718
719 #[tokio::test]
720 #[ignore] async fn test_hashflow_api_polling() {
722 dotenv().expect("Missing .env file");
723 let auth = get_hashflow_auth().unwrap();
724
725 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
726 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
727
728 let tokens = HashSet::from([wbtc, weth.clone()]);
729
730 let quote_tokens = HashSet::from([
731 Bytes::from_str("0xa0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), ]);
734
735 let client = HashflowClient::new(
736 Chain::Ethereum,
737 tokens,
738 1.0, quote_tokens,
740 auth.user,
741 auth.key,
742 Duration::from_secs(1),
743 Duration::from_secs(5),
744 )
745 .unwrap();
746
747 let mut stream = client.stream();
748
749 let result = timeout(Duration::from_secs(10), async {
750 let mut message_count = 0;
751 let max_messages = 3;
752 let mut total_components_received = 0;
753
754 while let Some(result) = stream.next().await {
755 match result {
756 Ok((component_id, msg)) => {
757 println!("Received message with ID: {component_id}");
758
759 assert!(!component_id.is_empty());
760 assert_eq!(component_id, "hashflow");
761 assert!(msg.header.timestamp > 0);
762
763 let snapshot = &msg.snapshots;
764 total_components_received += snapshot.states.len();
765
766 println!("Received {} components in this message (Total so far: {})",
767 snapshot.states.len(), total_components_received);
768
769 for (id, component_with_state) in &snapshot.states {
770 let attributes = &component_with_state.state.attributes;
771 let levels: &Bytes = attributes.get("levels").unwrap();
772 if attributes.contains_key("levels") {
774 println!("{levels:?}");
775 assert!(!attributes["levels"].is_empty());
776 }
777 if attributes.contains_key("mm") {
779 assert!(!attributes["mm"].is_empty());
780 }
781
782 if let Some(tvl) = component_with_state.component_tvl {
783 assert!(tvl >= 1.0);
784 println!("Component {id} TVL: ${tvl:.2}");
785 }
786 }
787
788 message_count += 1;
789 if message_count >= max_messages {
790 break;
791 }
792 }
793 Err(e) => {
794 panic!("Stream error: {e}");
795 }
796 }
797 }
798
799 assert!(message_count > 0, "Should have received at least one message");
800 assert!(total_components_received >= 1, "Should have received at least 1 component with $1 TVL threshold");
801 println!("Successfully received {message_count} messages with {total_components_received} total components");
802 })
803 .await;
804
805 match result {
806 Ok(_) => println!("Test completed successfully"),
807 Err(_) => panic!("Test timed out - no messages received within 5 seconds"),
808 }
809 }
810
811 #[tokio::test]
812 #[ignore] async fn test_request_binding_quote() {
814 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
815 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
816
817 let auth_user = String::from("propellerheads");
818 dotenv().expect("Missing .env file");
819 let auth_key = env::var("HASHFLOW_KEY").unwrap();
820
821 let client = HashflowClient::new(
822 Chain::Ethereum,
823 HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
824 10.0,
825 HashSet::new(),
826 auth_user,
827 auth_key,
828 Duration::from_secs(0),
829 Duration::from_secs(5),
830 )
831 .unwrap();
832
833 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
834
835 let params = GetAmountOutParams {
836 amount_in: BigUint::from(1_000000000000000000u64),
837 token_in: weth.clone(),
838 token_out: wbtc.clone(),
839 sender: router.clone(),
840 receiver: router.clone(),
841 };
842 let quote = client
843 .request_binding_quote(¶ms)
844 .await
845 .unwrap();
846
847 assert_eq!(quote.base_token, weth);
848 assert_eq!(quote.quote_token, wbtc);
849 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
850
851 assert!(quote.amount_out > BigUint::from(3000000u64));
853
854 assert_eq!(quote.quote_attributes.len(), 11);
855 let expected_attributes = [
856 "pool",
857 "external_account",
858 "trader",
859 "base_token",
860 "quote_token",
861 "base_token_amount",
862 "quote_token_amount",
863 "quote_expiry",
864 "nonce",
865 "tx_id",
866 "signature",
867 ];
868 for attr in expected_attributes {
869 assert!(
870 quote
871 .quote_attributes
872 .contains_key(attr),
873 "Missing attribute: {attr}"
874 );
875 }
876 assert_eq!(
877 quote
878 .quote_attributes
879 .get("trader")
880 .unwrap(),
881 &router
882 );
883 }
884
885 async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
887 use tokio::{io::AsyncWriteExt, net::TcpListener};
888
889 let listener = TcpListener::bind("127.0.0.1:0")
890 .await
891 .unwrap();
892 let addr = listener.local_addr().unwrap();
893
894 let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
895
896 tokio::spawn(async move {
897 while let Ok((mut stream, _)) = listener.accept().await {
898 let json_response_clone = json_response.to_owned();
899 tokio::spawn(async move {
900 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
901 let response = format!(
902 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
903 json_response_clone.len(),
904 json_response_clone
905 );
906 let _ = stream
907 .write_all(response.as_bytes())
908 .await;
909 let _ = stream.flush().await;
910 let _ = stream.shutdown().await;
911 });
912 }
913 });
914
915 tokio::time::sleep(Duration::from_millis(50)).await;
916 addr
917 }
918
919 fn create_test_hashflow_client(
920 quote_endpoint: String,
921 quote_timeout: Duration,
922 ) -> HashflowClient {
923 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
924 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
925
926 HashflowClient {
927 chain: Chain::Ethereum,
928 price_levels_endpoint: "http://unused/price-levels".to_string(),
929 market_makers_endpoint: "http://unused/market-makers".to_string(),
930 quote_endpoint,
931 tokens: HashSet::from([token_in, token_out]),
932 tvl: 10.0,
933 auth_key: "test_key".to_string(),
934 auth_user: "test_user".to_string(),
935 quote_tokens: HashSet::new(),
936 poll_time: Duration::from_secs(0),
937 quote_timeout,
938 }
939 }
940
941 fn create_test_quote_params() -> GetAmountOutParams {
943 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
944 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
945 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
946
947 GetAmountOutParams {
948 amount_in: BigUint::from(1_000000000000000000u64),
949 token_in,
950 token_out,
951 sender: router.clone(),
952 receiver: router,
953 }
954 }
955
956 #[tokio::test]
957 async fn test_hashflow_quote_timeout() {
958 let addr = create_delayed_response_server(500).await;
959
960 let client_short_timeout = create_test_hashflow_client(
962 format!("http://127.0.0.1:{}/rfq", addr.port()),
963 Duration::from_millis(200),
964 );
965 let params = create_test_quote_params();
966
967 let start = std::time::Instant::now();
969 let result = client_short_timeout
970 .request_binding_quote(¶ms)
971 .await;
972 let elapsed = start.elapsed();
973
974 assert!(result.is_err());
976 let err = result.unwrap_err();
977 match err {
978 RFQError::ConnectionError(msg) => {
979 assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
980 }
981 _ => panic!("Expected ConnectionError, got: {:?}", err),
982 }
983 assert!(
985 elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
986 "Expected timeout around 200ms, got: {:?}",
987 elapsed
988 );
989
990 let client_long_timeout = create_test_hashflow_client(
994 format!("http://127.0.0.1:{}/rfq", addr.port()),
995 Duration::from_secs(1),
996 );
997
998 let result = client_long_timeout
1000 .request_binding_quote(¶ms)
1001 .await;
1002
1003 assert!(result.is_ok(), "Expected success, got: {:?}", result);
1005 }
1006
1007 async fn create_retry_server() -> (std::net::SocketAddr, std::sync::Arc<std::sync::Mutex<u32>>)
1009 {
1010 use std::sync::{Arc, Mutex};
1011
1012 use tokio::{io::AsyncWriteExt, net::TcpListener};
1013
1014 let request_count = Arc::new(Mutex::new(0u32));
1015 let request_count_clone = request_count.clone();
1016
1017 let listener = TcpListener::bind("127.0.0.1:0")
1018 .await
1019 .unwrap();
1020 let addr = listener.local_addr().unwrap();
1021
1022 let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
1023
1024 tokio::spawn(async move {
1025 while let Ok((mut stream, _)) = listener.accept().await {
1026 let count_clone = request_count_clone.clone();
1027 let json_response_clone = json_response.to_owned();
1028 tokio::spawn(async move {
1029 *count_clone.lock().unwrap() += 1;
1030 let count = *count_clone.lock().unwrap();
1031 println!("Mock server: Received request #{count}");
1032
1033 if count <= 2 {
1034 let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1035 let _ = stream
1036 .write_all(response.as_bytes())
1037 .await;
1038 } else {
1039 let response = format!(
1040 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1041 json_response_clone.len(),
1042 json_response_clone
1043 );
1044 let _ = stream
1045 .write_all(response.as_bytes())
1046 .await;
1047 }
1048 let _ = stream.flush().await;
1049 let _ = stream.shutdown().await;
1050 });
1051 }
1052 });
1053
1054 tokio::time::sleep(Duration::from_millis(50)).await;
1055 (addr, request_count)
1056 }
1057
1058 #[tokio::test]
1059 async fn test_hashflow_quote_retry_on_bad_response() {
1060 let (addr, request_count) = create_retry_server().await;
1061
1062 let client = create_test_hashflow_client(
1063 format!("http://127.0.0.1:{}/rfq", addr.port()),
1064 Duration::from_secs(5),
1065 );
1066 let params = create_test_quote_params();
1067 let result = client
1068 .request_binding_quote(¶ms)
1069 .await;
1070
1071 assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1072 let quote = result.unwrap();
1073
1074 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
1076 assert_eq!(quote.amount_out, BigUint::from(3329502u64));
1077
1078 let final_count = *request_count.lock().unwrap();
1080 assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1081 }
1082
1083 #[test]
1084 fn test_hashflow_client_serialize_deserialize_roundtrip() {
1085 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1086 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1087 let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1088
1089 let original = HashflowClient {
1090 chain: Chain::Ethereum,
1091 price_levels_endpoint: "https://api.hashflow.com/price_levels".to_string(),
1092 market_makers_endpoint: "https://api.hashflow.com/market_makers".to_string(),
1093 quote_endpoint: "https://api.hashflow.com/quote".to_string(),
1094 tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1095 tvl: 50.5,
1096 auth_key: "secret_key".to_string(),
1097 auth_user: "secret_user".to_string(),
1098 quote_tokens: HashSet::from([quote_token.clone()]),
1099 poll_time: Duration::from_secs(10),
1100 quote_timeout: Duration::from_millis(5500),
1101 };
1102
1103 let serialized = serde_json::to_string(&original).unwrap();
1104 let deserialized: HashflowClient = serde_json::from_str(&serialized).unwrap();
1105
1106 assert_eq!(deserialized.chain, original.chain);
1108 assert_eq!(deserialized.price_levels_endpoint, original.price_levels_endpoint);
1109 assert_eq!(deserialized.market_makers_endpoint, original.market_makers_endpoint);
1110 assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1111 assert_eq!(deserialized.tokens, original.tokens);
1112 assert_eq!(deserialized.tvl, original.tvl);
1113 assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1114 assert_eq!(deserialized.poll_time, original.poll_time);
1115 assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1116
1117 assert_eq!(deserialized.auth_key, "");
1119 assert_eq!(deserialized.auth_user, "");
1120 assert_ne!(deserialized.auth_key, original.auth_key);
1121 assert_ne!(deserialized.auth_user, original.auth_user);
1122 }
1123
1124 #[test]
1125 fn test_hashflow_client_deserialize_with_credentials() {
1126 let json = r#"{
1129 "chain": "ethereum",
1130 "price_levels_endpoint": "https://api.hashflow.com/price_levels",
1131 "market_makers_endpoint": "https://api.hashflow.com/market_makers",
1132 "quote_endpoint": "https://api.hashflow.com/quote",
1133 "tokens": [],
1134 "tvl": 10.0,
1135 "auth_key": "provided_key",
1136 "auth_user": "provided_user",
1137 "quote_tokens": [],
1138 "poll_time": {"secs": 10, "nanos": 0},
1139 "quote_timeout": {"secs": 30, "nanos": 0}
1140 }"#;
1141
1142 let client: HashflowClient = serde_json::from_str(json).unwrap();
1143
1144 assert_eq!(client.auth_key, "provided_key");
1146 assert_eq!(client.auth_user, "provided_user");
1147 }
1148}