1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address, U256};
8use async_trait::async_trait;
9use futures::stream::BoxStream;
10use num_bigint::BigUint;
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13use tokio::time::{interval, timeout, Duration};
14use tracing::{error, info, warn};
15use tycho_common::{
16 models::{protocol::GetAmountOutParams, Chain},
17 simulation::indicatively_priced::SignedQuote,
18 Bytes,
19};
20
21use crate::{
22 evm::protocol::u256_num::biguint_to_u256,
23 rfq::{
24 client::RFQClient,
25 errors::RFQError,
26 models::TimestampHeader,
27 protocols::hashflow::models::{
28 HashflowChain, HashflowMarketMakerLevels, HashflowMarketMakersResponse,
29 HashflowPriceLevelsResponse, HashflowQuoteRequest, HashflowQuoteResponse, HashflowRFQ,
30 },
31 },
32 tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
33 tycho_common::models::protocol::{ProtocolComponent, ProtocolComponentState},
34};
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
37pub struct HashflowClient {
38 chain: Chain,
39 price_levels_endpoint: String,
40 market_makers_endpoint: String,
41 quote_endpoint: String,
42 tokens: HashSet<Bytes>,
44 tvl: f64,
46 #[serde(skip_serializing, default)]
47 auth_key: String,
48 #[serde(skip_serializing, default)]
49 auth_user: String,
50 quote_tokens: HashSet<Bytes>,
52 poll_time: Duration,
53 quote_timeout: Duration,
54}
55
56impl HashflowClient {
57 pub const PROTOCOL_SYSTEM: &'static str = "rfq:hashflow";
58
59 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 chain: Chain,
62 tokens: HashSet<Bytes>,
63 tvl: f64,
64 quote_tokens: HashSet<Bytes>,
65 auth_user: String,
66 auth_key: String,
67 poll_time: Duration,
68 quote_timeout: Duration,
69 ) -> Result<Self, RFQError> {
70 Ok(Self {
71 chain,
72 price_levels_endpoint: "https://api.hashflow.com/taker/v3/price-levels".to_string(),
73 market_makers_endpoint: "https://api.hashflow.com/taker/v3/market-makers".to_string(),
74 quote_endpoint: "https://api.hashflow.com/taker/v3/rfq".to_string(),
75 tokens,
76 tvl,
77 auth_key,
78 auth_user,
79 quote_tokens,
80 poll_time,
81 quote_timeout,
82 })
83 }
84
85 fn normalize_tvl(
88 &self,
89 raw_tvl: f64,
90 quote_token: Bytes,
91 levels_by_mm: &HashMap<String, Vec<HashflowMarketMakerLevels>>,
92 ) -> Result<f64, RFQError> {
93 if self.quote_tokens.contains("e_token) {
95 return Ok(raw_tvl);
96 }
97
98 for approved_quote_token in &self.quote_tokens {
101 for mm_levels_inner in levels_by_mm.values() {
102 for quote_mm_level in mm_levels_inner {
103 if quote_mm_level.pair.base_token == quote_token &&
105 quote_mm_level.pair.quote_token == *approved_quote_token
106 {
107 if let Some(price) = quote_mm_level.get_price(1.0) {
108 return Ok(raw_tvl * price);
109 }
110 }
111 }
112 }
113 }
114
115 Ok(0.0)
117 }
118
119 fn create_component_with_state(
120 &self,
121 component_id: String,
122 tokens: Vec<Bytes>,
123 mm_name: &str,
124 mm_level: &HashflowMarketMakerLevels,
125 tvl: f64,
126 ) -> ComponentWithState {
127 let protocol_component = ProtocolComponent {
128 id: component_id.clone(),
129 protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
130 protocol_type_name: "hashflow_pool".to_string(),
131 chain: self.chain,
132 tokens,
133 contract_addresses: vec![], ..Default::default()
135 };
136
137 let mut attributes = HashMap::new();
138
139 if !mm_level.levels.is_empty() {
141 let levels_json = serde_json::to_string(&mm_level.levels).unwrap_or_default();
142 attributes.insert("levels".to_string(), levels_json.as_bytes().to_vec().into());
143 }
144 attributes.insert("mm".to_string(), mm_name.as_bytes().to_vec().into());
145
146 ComponentWithState {
147 state: ProtocolComponentState::new(&component_id, attributes, HashMap::new()),
148 component: protocol_component,
149 component_tvl: Some(tvl),
150 entrypoints: vec![],
151 }
152 }
153
154 async fn fetch_market_makers(&mut self) -> Result<Vec<String>, RFQError> {
155 let query_params = vec![
156 ("source", self.auth_user.clone()),
157 ("baseChainType", "evm".to_string()),
158 ("baseChainId", self.chain.id().to_string()),
159 ];
160
161 let http_client = Client::new();
162 let request = http_client
163 .get(&self.market_makers_endpoint)
164 .query(&query_params)
165 .header("accept", "application/json")
166 .header("Authorization", &self.auth_key);
167
168 let response = request.send().await.map_err(|e| {
169 RFQError::ConnectionError(format!("Failed to fetch market makers: {e}"))
170 })?;
171
172 if !response.status().is_success() {
173 return Err(RFQError::ConnectionError(format!(
174 "HTTP error {}: {}",
175 response.status(),
176 response
177 .text()
178 .await
179 .unwrap_or_default()
180 )));
181 }
182
183 let mm_response: HashflowMarketMakersResponse = response.json().await.map_err(|e| {
184 RFQError::ParsingError(format!("Failed to parse market makers response: {e}"))
185 })?;
186
187 info!(
188 "Fetched {} market makers: {:?}",
189 mm_response.market_makers.len(),
190 mm_response.market_makers
191 );
192
193 Ok(mm_response.market_makers)
194 }
195
196 async fn fetch_price_levels(
197 &self,
198 market_makers: &Vec<String>,
199 ) -> Result<HashMap<String, Vec<HashflowMarketMakerLevels>>, RFQError> {
200 let mut query_params = vec![
201 ("source", self.auth_user.clone()),
202 ("baseChainType", "evm".to_string()),
203 ("baseChainId", self.chain.id().to_string()),
204 ];
205
206 for mm in market_makers {
208 query_params.push(("marketMakers[]", mm.clone()));
209 }
210
211 let http_client = Client::new();
212 let request = http_client
213 .get(&self.price_levels_endpoint)
214 .query(&query_params)
215 .header("accept", "application/json")
216 .header("Authorization", &self.auth_key);
217
218 let response = request
219 .send()
220 .await
221 .map_err(|e| RFQError::ConnectionError(format!("Failed to fetch price levels: {e}")))?;
222
223 if !response.status().is_success() {
224 return Err(RFQError::ConnectionError(format!(
225 "HTTP error {}: {}",
226 response.status(),
227 response
228 .text()
229 .await
230 .unwrap_or_default()
231 )));
232 }
233
234 let price_response: HashflowPriceLevelsResponse = response.json().await.map_err(|e| {
235 RFQError::ParsingError(format!("Failed to parse price levels response: {e}"))
236 })?;
237
238 if price_response.status != "success" {
239 return Err(RFQError::InvalidInput(format!(
240 "API returned error status: {}",
241 price_response.error.unwrap_or_default()
242 )));
243 }
244
245 price_response
246 .levels
247 .ok_or_else(|| RFQError::ParsingError("API response missing levels".to_string()))
248 }
249}
250
251#[async_trait]
252impl RFQClient for HashflowClient {
253 fn stream(
254 &self,
255 ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
256 let mut client = self.clone();
257
258 Box::pin(async_stream::stream! {
259 let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
260 let mut ticker = interval(client.poll_time);
261
262 info!("Starting Hashflow price levels polling every {} seconds", client.poll_time.as_secs());
263 info!("TVL threshold: {:.2}", client.tvl);
264
265 loop {
266 ticker.tick().await;
267
268 let market_makers;
269 match client.fetch_market_makers().await {
270 Ok(mms) => {
271 market_makers = mms;
272 info!("Successfully fetched market makers");
273 }
274 Err(e) => {
275 info!("Failed to fetch market makers: {}", e);
276 continue;
277 }
278 }
279
280 match client.fetch_price_levels(&market_makers).await {
281 Ok(levels_by_mm) => {
282 let mut new_components = HashMap::new();
283
284 info!("Fetched price levels from {} market makers", levels_by_mm.len());
285 for (mm_name, mm_levels) in levels_by_mm.iter() {
287 for mm_level in mm_levels {
288 let base_token = &mm_level.pair.base_token;
289 let quote_token = &mm_level.pair.quote_token;
290
291 if client.tokens.contains(base_token) && client.tokens.contains(quote_token) {
293 let tokens = vec![base_token.clone(), quote_token.clone()];
294 let tvl = mm_level.calculate_tvl();
295
296 let normalized_tvl = client.normalize_tvl(
298 tvl,
299 mm_level.pair.quote_token.clone(),
300 &levels_by_mm,
301 )?;
302
303 let pair_str = format!("hashflow_{}/{}", hex::encode(base_token), hex::encode(quote_token));
305 let component_id = format!("{}", keccak256(pair_str.as_bytes()));
306
307 if normalized_tvl < client.tvl {
308 info!("Filtering out component {} due to low TVL: {:.2} < {:.2}",
309 component_id, normalized_tvl, client.tvl);
310 continue;
311 }
312
313 let component_with_state = client.create_component_with_state(
314 component_id.clone(),
315 tokens,
316 mm_name,
317 mm_level,
318 normalized_tvl
319 );
320 new_components.insert(component_id, component_with_state);
321 }
322 }
323 }
324
325 let removed_components: HashMap<String, ProtocolComponent> = current_components
327 .iter()
328 .filter(|&(id, _)| !new_components.contains_key(id))
329 .map(|(k, v)| (k.clone(), v.component.clone()))
330 .collect();
331
332 current_components = new_components.clone();
334
335 let snapshot = Snapshot {
336 states: new_components,
337 vm_storage: HashMap::new(),
338 };
339 let timestamp = SystemTime::now().duration_since(
340 SystemTime::UNIX_EPOCH
341 ).map_err(
342 |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
343 )?.as_secs();
344
345 let msg = StateSyncMessage::<TimestampHeader> {
346 header: TimestampHeader { timestamp },
347 snapshots: snapshot,
348 deltas: None,
349 removed_components,
350 };
351
352 yield Ok(("hashflow".to_string(), msg));
353 },
354 Err(e) => {
355 error!("Failed to fetch price levels from Hashflow API: {}", e);
356 continue;
357 }
358 }
359 }
360 })
361 }
362
363 async fn request_binding_quote(
364 &self,
365 params: &GetAmountOutParams,
366 ) -> Result<SignedQuote, RFQError> {
367 let hashflow_chain = HashflowChain::from(self.chain);
368 let quote_request = HashflowQuoteRequest {
369 source: self.auth_user.clone(),
370 base_chain: hashflow_chain.clone(),
371 quote_chain: hashflow_chain,
372 rfqs: vec![HashflowRFQ {
373 base_token: params.token_in.to_string(),
374 quote_token: params.token_out.to_string(),
375 base_token_amount: Some(params.amount_in.to_string()),
376 quote_token_amount: None,
377 trader: params.receiver.to_string(),
378 effective_trader: None,
379 }],
380 calldata: false,
381 };
382
383 let url = self.quote_endpoint.clone();
384
385 let start_time = std::time::Instant::now();
386 const MAX_RETRIES: u32 = 3;
387 let mut last_error = None;
388
389 for attempt in 0..MAX_RETRIES {
390 let elapsed = start_time.elapsed();
392 if elapsed >= self.quote_timeout {
393 return Err(last_error.unwrap_or_else(|| {
394 RFQError::ConnectionError(format!(
395 "Hashflow quote request timed out after {} seconds",
396 self.quote_timeout.as_secs()
397 ))
398 }));
399 }
400
401 let remaining_time = self.quote_timeout - elapsed;
402
403 let http_client = Client::new();
404 let request = http_client
405 .post(&url)
406 .json("e_request)
407 .header("accept", "application/json")
408 .header("Authorization", &self.auth_key);
409
410 let response = match timeout(remaining_time, request.send()).await {
411 Ok(Ok(resp)) => resp,
412 Ok(Err(e)) => {
413 warn!(
414 "Hashflow quote request failed (attempt {}/{}): {}",
415 attempt + 1,
416 MAX_RETRIES,
417 e
418 );
419 last_error = Some(RFQError::ConnectionError(format!(
420 "Failed to send Hashflow quote request: {e}"
421 )));
422 if attempt < MAX_RETRIES - 1 {
423 tokio::time::sleep(Duration::from_millis(100)).await;
424 continue;
425 } else {
426 return Err(last_error.unwrap());
427 }
428 }
429 Err(_) => {
430 return Err(RFQError::ConnectionError(format!(
431 "Hashflow quote request timed out after {} seconds",
432 self.quote_timeout.as_secs()
433 )));
434 }
435 };
436
437 if response.status() != 200 {
438 let err_msg = match response.text().await {
439 Ok(text) => text,
440 Err(e) => {
441 warn!(
442 "Hashflow error response parsing failed (attempt {}/{}): {}",
443 attempt + 1,
444 MAX_RETRIES,
445 e
446 );
447 last_error = Some(RFQError::ParsingError(format!(
448 "Failed to read response text from Hashflow failed request: {e}"
449 )));
450 if attempt < MAX_RETRIES - 1 {
451 tokio::time::sleep(Duration::from_millis(100)).await;
452 continue;
453 } else {
454 return Err(last_error.unwrap());
455 }
456 }
457 };
458 last_error = Some(RFQError::FatalError(format!(
459 "Failed to send Hashflow quote request: {err_msg}",
460 )));
461 if attempt < MAX_RETRIES - 1 {
462 warn!(
463 "Hashflow returned non-200 status (attempt {}/{}): {}",
464 attempt + 1,
465 MAX_RETRIES,
466 err_msg
467 );
468 tokio::time::sleep(Duration::from_millis(100)).await;
469 continue;
470 } else {
471 return Err(last_error.unwrap());
472 }
473 }
474
475 let quote_response = match response
476 .json::<HashflowQuoteResponse>()
477 .await
478 {
479 Ok(resp) => resp,
480 Err(e) => {
481 warn!(
482 "Hashflow quote response parsing failed (attempt {}/{}): {}",
483 attempt + 1,
484 MAX_RETRIES,
485 e
486 );
487 last_error = Some(RFQError::ParsingError(format!(
488 "Failed to parse Hashflow quote response: {e}"
489 )));
490 if attempt < MAX_RETRIES - 1 {
491 tokio::time::sleep(Duration::from_millis(100)).await;
492 continue;
493 } else {
494 return Err(last_error.unwrap());
495 }
496 }
497 };
498
499 match quote_response.status.as_str() {
500 "success" => {
501 if let Some(quotes) = quote_response.quotes {
502 if quotes.is_empty() {
503 return Err(RFQError::QuoteNotFound(format!(
504 "Hashflow quote not found for {} {} ->{}",
505 params.amount_in, params.token_in, params.token_out,
506 )));
507 }
508 let quote = quotes[0].clone();
510 quote.validate(params)?;
511
512 let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
513 quote_attributes.insert("pool".to_string(), quote.quote_data.pool);
514 if let Some(external_account) = quote.quote_data.external_account {
515 quote_attributes
516 .insert("external_account".to_string(), external_account);
517 } else {
518 quote_attributes.insert(
519 "external_account".to_string(),
520 Bytes::from_str(&Address::ZERO.to_string()).map_err(|_| {
521 RFQError::ParsingError(
522 "Failed to parse zero address".to_string(),
523 )
524 })?,
525 );
526 }
527 quote_attributes.insert("trader".to_string(), quote.quote_data.trader);
528 quote_attributes
529 .insert("base_token".to_string(), quote.quote_data.base_token);
530 quote_attributes
531 .insert("quote_token".to_string(), quote.quote_data.quote_token);
532 quote_attributes.insert(
533 "base_token_amount".to_string(),
534 Bytes::from(
535 biguint_to_u256(
536 &BigUint::from_str("e.quote_data.base_token_amount)
537 .map_err(|_| {
538 RFQError::ParsingError(format!(
539 "Failed to parse base token amount: {}",
540 quote.quote_data.base_token_amount
541 ))
542 })?,
543 )
544 .to_be_bytes::<32>()
545 .to_vec(),
546 ),
547 );
548 quote_attributes.insert(
549 "quote_token_amount".to_string(),
550 Bytes::from(
551 biguint_to_u256(
552 &BigUint::from_str("e.quote_data.quote_token_amount)
553 .map_err(|_| {
554 RFQError::ParsingError(format!(
555 "Failed to parse quote token amount: {}",
556 quote.quote_data.quote_token_amount
557 ))
558 })?,
559 )
560 .to_be_bytes::<32>()
561 .to_vec(),
562 ),
563 );
564 quote_attributes.insert(
565 "quote_expiry".to_string(),
566 Bytes::from(
567 U256::from(quote.quote_data.quote_expiry)
568 .to_be_bytes::<32>()
569 .to_vec(),
570 ),
571 );
572 quote_attributes.insert(
573 "nonce".to_string(),
574 Bytes::from(
575 U256::from(quote.quote_data.nonce)
576 .to_be_bytes::<32>()
577 .to_vec(),
578 ),
579 );
580 quote_attributes.insert("tx_id".to_string(), quote.quote_data.tx_id);
581 quote_attributes.insert("signature".to_string(), quote.signature);
582
583 let signed_quote = SignedQuote {
584 base_token: params.token_in.clone(),
585 quote_token: params.token_out.clone(),
586 amount_in: BigUint::from_str("e.quote_data.base_token_amount)
587 .map_err(|_| {
588 RFQError::ParsingError(format!(
589 "Failed to parse amount in string: {}",
590 quote.quote_data.base_token_amount
591 ))
592 })?,
593 amount_out: BigUint::from_str("e.quote_data.quote_token_amount)
594 .map_err(|_| {
595 RFQError::ParsingError(format!(
596 "Failed to parse amount out string: {}",
597 quote.quote_data.quote_token_amount
598 ))
599 })?,
600 quote_attributes,
601 };
602 return Ok(signed_quote);
603 } else {
604 return Err(RFQError::QuoteNotFound(format!(
605 "Hashflow quote not found for {} {} ->{}",
606 params.amount_in, params.token_in, params.token_out,
607 )));
608 }
609 }
610 "fail" => {
611 return Err(RFQError::FatalError(format!(
612 "Hashflow API error: {:?}",
613 quote_response.error
614 )));
615 }
616 _ => {
617 return Err(RFQError::FatalError(
618 "Hashflow API error: Unknown status".to_string(),
619 ));
620 }
621 }
622 }
623
624 Err(last_error.unwrap_or_else(|| {
625 RFQError::ConnectionError("Hashflow quote request failed after retries".to_string())
626 }))
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 use std::{env, str::FromStr, time::Duration};
633
634 use dotenv::dotenv;
635 use futures::StreamExt;
636 use tokio::time::timeout;
637
638 use super::*;
639 use crate::rfq::{
640 constants::get_hashflow_auth,
641 protocols::hashflow::models::{HashflowPair, HashflowPriceLevel},
642 };
643
644 #[test]
645 fn test_normalize_tvl_same_quote_token() {
646 let client = create_test_client();
647 let levels = HashMap::new();
648
649 let result = client.normalize_tvl(
651 1000.0,
652 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
653 &levels,
654 );
655 assert!(result.is_ok());
656 assert_eq!(result.unwrap(), 1000.0);
657 }
658
659 #[test]
660 fn test_normalize_tvl_different_quote_token() {
661 let client = create_test_client();
662 let mut levels = HashMap::new();
663 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
664 let usdc = Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
665
666 let eth_usdc_level = HashflowMarketMakerLevels {
668 pair: HashflowPair { base_token: weth.clone(), quote_token: usdc },
669 levels: vec![
670 HashflowPriceLevel { quantity: 1.0, price: 3000.0 }, ],
672 };
673
674 levels.insert("test_mm".to_string(), vec![eth_usdc_level]);
675
676 let result = client.normalize_tvl(2.0, weth, &levels);
678 assert!(result.is_ok());
679 assert_eq!(result.unwrap(), 6000.0);
681 }
682
683 #[test]
684 fn test_normalize_tvl_no_conversion_available() {
685 let client = create_test_client();
686 let levels = HashMap::new();
687 let result = client.normalize_tvl(
688 1000.0,
689 Bytes::from_str("0x1234567890123456789012345678901234567890").unwrap(),
690 &levels,
691 );
692 assert!(result.is_ok());
693 assert_eq!(result.unwrap(), 0.0);
694 }
695
696 fn create_test_client() -> HashflowClient {
697 let quote_tokens = HashSet::from([
698 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(), ]);
701
702 HashflowClient::new(
703 Chain::Ethereum,
704 HashSet::new(),
705 1.0,
706 quote_tokens,
707 "test_user".to_string(),
708 "test_key".to_string(),
709 Duration::from_secs(5),
710 Duration::from_secs(5),
711 )
712 .unwrap()
713 }
714
715 #[tokio::test]
716 #[ignore] async fn test_hashflow_api_polling() {
718 dotenv().expect("Missing .env file");
719 let auth = get_hashflow_auth().unwrap();
720
721 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
722 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
723
724 let tokens = HashSet::from([wbtc, weth.clone()]);
725
726 let quote_tokens = HashSet::from([
727 Bytes::from_str("0xa0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(), Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), ]);
730
731 let client = HashflowClient::new(
732 Chain::Ethereum,
733 tokens,
734 1.0, quote_tokens,
736 auth.user,
737 auth.key,
738 Duration::from_secs(1),
739 Duration::from_secs(5),
740 )
741 .unwrap();
742
743 let mut stream = client.stream();
744
745 let result = timeout(Duration::from_secs(10), async {
746 let mut message_count = 0;
747 let max_messages = 3;
748 let mut total_components_received = 0;
749
750 while let Some(result) = stream.next().await {
751 match result {
752 Ok((component_id, msg)) => {
753 println!("Received message with ID: {component_id}");
754
755 assert!(!component_id.is_empty());
756 assert_eq!(component_id, "hashflow");
757 assert!(msg.header.timestamp > 0);
758
759 let snapshot = &msg.snapshots;
760 total_components_received += snapshot.states.len();
761
762 println!("Received {} components in this message (Total so far: {})",
763 snapshot.states.len(), total_components_received);
764
765 for (id, component_with_state) in &snapshot.states {
766 let attributes = &component_with_state.state.attributes;
767 let levels: &Bytes = attributes.get("levels").unwrap();
768 if attributes.contains_key("levels") {
770 println!("{levels:?}");
771 assert!(!attributes["levels"].is_empty());
772 }
773 if attributes.contains_key("mm") {
775 assert!(!attributes["mm"].is_empty());
776 }
777
778 if let Some(tvl) = component_with_state.component_tvl {
779 assert!(tvl >= 1.0);
780 println!("Component {id} TVL: ${tvl:.2}");
781 }
782 }
783
784 message_count += 1;
785 if message_count >= max_messages {
786 break;
787 }
788 }
789 Err(e) => {
790 panic!("Stream error: {e}");
791 }
792 }
793 }
794
795 assert!(message_count > 0, "Should have received at least one message");
796 assert!(total_components_received >= 1, "Should have received at least 1 component with $1 TVL threshold");
797 println!("Successfully received {message_count} messages with {total_components_received} total components");
798 })
799 .await;
800
801 match result {
802 Ok(_) => println!("Test completed successfully"),
803 Err(_) => panic!("Test timed out - no messages received within 5 seconds"),
804 }
805 }
806
807 #[tokio::test]
808 #[ignore] async fn test_request_binding_quote() {
810 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
811 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
812
813 let auth_user = String::from("propellerheads");
814 dotenv().expect("Missing .env file");
815 let auth_key = env::var("HASHFLOW_KEY").unwrap();
816
817 let client = HashflowClient::new(
818 Chain::Ethereum,
819 HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
820 10.0,
821 HashSet::new(),
822 auth_user,
823 auth_key,
824 Duration::from_secs(0),
825 Duration::from_secs(5),
826 )
827 .unwrap();
828
829 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
830
831 let params = GetAmountOutParams {
832 amount_in: BigUint::from(1_000000000000000000u64),
833 token_in: weth.clone(),
834 token_out: wbtc.clone(),
835 sender: router.clone(),
836 receiver: router.clone(),
837 };
838 let quote = client
839 .request_binding_quote(¶ms)
840 .await
841 .unwrap();
842
843 assert_eq!(quote.base_token, weth);
844 assert_eq!(quote.quote_token, wbtc);
845 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
846
847 assert!(quote.amount_out > BigUint::from(3000000u64));
849
850 assert_eq!(quote.quote_attributes.len(), 11);
851 let expected_attributes = [
852 "pool",
853 "external_account",
854 "trader",
855 "base_token",
856 "quote_token",
857 "base_token_amount",
858 "quote_token_amount",
859 "quote_expiry",
860 "nonce",
861 "tx_id",
862 "signature",
863 ];
864 for attr in expected_attributes {
865 assert!(
866 quote
867 .quote_attributes
868 .contains_key(attr),
869 "Missing attribute: {attr}"
870 );
871 }
872 assert_eq!(
873 quote
874 .quote_attributes
875 .get("trader")
876 .unwrap(),
877 &router
878 );
879 }
880
881 async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
883 use tokio::{io::AsyncWriteExt, net::TcpListener};
884
885 let listener = TcpListener::bind("127.0.0.1:0")
886 .await
887 .unwrap();
888 let addr = listener.local_addr().unwrap();
889
890 let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
891
892 tokio::spawn(async move {
893 while let Ok((mut stream, _)) = listener.accept().await {
894 let json_response_clone = json_response.to_owned();
895 tokio::spawn(async move {
896 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
897 let response = format!(
898 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
899 json_response_clone.len(),
900 json_response_clone
901 );
902 let _ = stream
903 .write_all(response.as_bytes())
904 .await;
905 let _ = stream.flush().await;
906 let _ = stream.shutdown().await;
907 });
908 }
909 });
910
911 tokio::time::sleep(Duration::from_millis(50)).await;
912 addr
913 }
914
915 fn create_test_hashflow_client(
916 quote_endpoint: String,
917 quote_timeout: Duration,
918 ) -> HashflowClient {
919 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
920 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
921
922 HashflowClient {
923 chain: Chain::Ethereum,
924 price_levels_endpoint: "http://unused/price-levels".to_string(),
925 market_makers_endpoint: "http://unused/market-makers".to_string(),
926 quote_endpoint,
927 tokens: HashSet::from([token_in, token_out]),
928 tvl: 10.0,
929 auth_key: "test_key".to_string(),
930 auth_user: "test_user".to_string(),
931 quote_tokens: HashSet::new(),
932 poll_time: Duration::from_secs(0),
933 quote_timeout,
934 }
935 }
936
937 fn create_test_quote_params() -> GetAmountOutParams {
939 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
940 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
941 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
942
943 GetAmountOutParams {
944 amount_in: BigUint::from(1_000000000000000000u64),
945 token_in,
946 token_out,
947 sender: router.clone(),
948 receiver: router,
949 }
950 }
951
952 #[tokio::test]
953 async fn test_hashflow_quote_timeout() {
954 let addr = create_delayed_response_server(500).await;
955
956 let client_short_timeout = create_test_hashflow_client(
958 format!("http://127.0.0.1:{}/rfq", addr.port()),
959 Duration::from_millis(200),
960 );
961 let params = create_test_quote_params();
962
963 let start = std::time::Instant::now();
965 let result = client_short_timeout
966 .request_binding_quote(¶ms)
967 .await;
968 let elapsed = start.elapsed();
969
970 assert!(result.is_err());
972 let err = result.unwrap_err();
973 match err {
974 RFQError::ConnectionError(msg) => {
975 assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
976 }
977 _ => panic!("Expected ConnectionError, got: {:?}", err),
978 }
979 assert!(
981 elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
982 "Expected timeout around 200ms, got: {:?}",
983 elapsed
984 );
985
986 let client_long_timeout = create_test_hashflow_client(
990 format!("http://127.0.0.1:{}/rfq", addr.port()),
991 Duration::from_secs(1),
992 );
993
994 let result = client_long_timeout
996 .request_binding_quote(¶ms)
997 .await;
998
999 assert!(result.is_ok(), "Expected success, got: {:?}", result);
1001 }
1002
1003 async fn create_retry_server() -> (std::net::SocketAddr, std::sync::Arc<std::sync::Mutex<u32>>)
1005 {
1006 use std::sync::{Arc, Mutex};
1007
1008 use tokio::{io::AsyncWriteExt, net::TcpListener};
1009
1010 let request_count = Arc::new(Mutex::new(0u32));
1011 let request_count_clone = request_count.clone();
1012
1013 let listener = TcpListener::bind("127.0.0.1:0")
1014 .await
1015 .unwrap();
1016 let addr = listener.local_addr().unwrap();
1017
1018 let json_response = r#"{"status":"success","error":null,"rfqId":"test-rfq-id","internalRfqIds":null,"quotes":[{"quoteData":{"pool":"0x71D9750ECF0c5081FAE4E3EDC4253E52024b0B59","externalAccount":null,"trader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","effectiveTrader":"0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35","baseToken":"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","baseTokenAmount":"1000000000000000000","quoteToken":"0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599","quoteTokenAmount":"3329502","quoteExpiry":1707847360,"nonce":1707844960943648659,"txid":"0x0000000000000000000000000000000000000000000000000000000000000001"},"signature":"0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef12"}]}"#;
1019
1020 tokio::spawn(async move {
1021 while let Ok((mut stream, _)) = listener.accept().await {
1022 let count_clone = request_count_clone.clone();
1023 let json_response_clone = json_response.to_owned();
1024 tokio::spawn(async move {
1025 *count_clone.lock().unwrap() += 1;
1026 let count = *count_clone.lock().unwrap();
1027 println!("Mock server: Received request #{count}");
1028
1029 if count <= 2 {
1030 let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1031 let _ = stream
1032 .write_all(response.as_bytes())
1033 .await;
1034 } else {
1035 let response = format!(
1036 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1037 json_response_clone.len(),
1038 json_response_clone
1039 );
1040 let _ = stream
1041 .write_all(response.as_bytes())
1042 .await;
1043 }
1044 let _ = stream.flush().await;
1045 let _ = stream.shutdown().await;
1046 });
1047 }
1048 });
1049
1050 tokio::time::sleep(Duration::from_millis(50)).await;
1051 (addr, request_count)
1052 }
1053
1054 #[tokio::test]
1055 async fn test_hashflow_quote_retry_on_bad_response() {
1056 let (addr, request_count) = create_retry_server().await;
1057
1058 let client = create_test_hashflow_client(
1059 format!("http://127.0.0.1:{}/rfq", addr.port()),
1060 Duration::from_secs(5),
1061 );
1062 let params = create_test_quote_params();
1063 let result = client
1064 .request_binding_quote(¶ms)
1065 .await;
1066
1067 assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1068 let quote = result.unwrap();
1069
1070 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
1072 assert_eq!(quote.amount_out, BigUint::from(3329502u64));
1073
1074 let final_count = *request_count.lock().unwrap();
1076 assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1077 }
1078
1079 #[test]
1080 fn test_hashflow_client_serialize_deserialize_roundtrip() {
1081 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1082 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1083 let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1084
1085 let original = HashflowClient {
1086 chain: Chain::Ethereum,
1087 price_levels_endpoint: "https://api.hashflow.com/price_levels".to_string(),
1088 market_makers_endpoint: "https://api.hashflow.com/market_makers".to_string(),
1089 quote_endpoint: "https://api.hashflow.com/quote".to_string(),
1090 tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1091 tvl: 50.5,
1092 auth_key: "secret_key".to_string(),
1093 auth_user: "secret_user".to_string(),
1094 quote_tokens: HashSet::from([quote_token.clone()]),
1095 poll_time: Duration::from_secs(10),
1096 quote_timeout: Duration::from_millis(5500),
1097 };
1098
1099 let serialized = serde_json::to_string(&original).unwrap();
1100 let deserialized: HashflowClient = serde_json::from_str(&serialized).unwrap();
1101
1102 assert_eq!(deserialized.chain, original.chain);
1104 assert_eq!(deserialized.price_levels_endpoint, original.price_levels_endpoint);
1105 assert_eq!(deserialized.market_makers_endpoint, original.market_makers_endpoint);
1106 assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1107 assert_eq!(deserialized.tokens, original.tokens);
1108 assert_eq!(deserialized.tvl, original.tvl);
1109 assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1110 assert_eq!(deserialized.poll_time, original.poll_time);
1111 assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1112
1113 assert_eq!(deserialized.auth_key, "");
1115 assert_eq!(deserialized.auth_user, "");
1116 assert_ne!(deserialized.auth_key, original.auth_key);
1117 assert_ne!(deserialized.auth_user, original.auth_user);
1118 }
1119
1120 #[test]
1121 fn test_hashflow_client_deserialize_with_credentials() {
1122 let json = r#"{
1125 "chain": "ethereum",
1126 "price_levels_endpoint": "https://api.hashflow.com/price_levels",
1127 "market_makers_endpoint": "https://api.hashflow.com/market_makers",
1128 "quote_endpoint": "https://api.hashflow.com/quote",
1129 "tokens": [],
1130 "tvl": 10.0,
1131 "auth_key": "provided_key",
1132 "auth_user": "provided_user",
1133 "quote_tokens": [],
1134 "poll_time": {"secs": 10, "nanos": 0},
1135 "quote_timeout": {"secs": 30, "nanos": 0}
1136 }"#;
1137
1138 let client: HashflowClient = serde_json::from_str(json).unwrap();
1139
1140 assert_eq!(client.auth_key, "provided_key");
1142 assert_eq!(client.auth_user, "provided_user");
1143 }
1144}