1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address};
8use async_trait::async_trait;
9use futures::{stream::BoxStream, StreamExt};
10use http::Request;
11use num_bigint::BigUint;
12use prost::Message as ProstMessage;
13use reqwest::Client;
14use tokio::time::{sleep, timeout, Duration};
15use tokio_tungstenite::{
16 connect_async_with_config,
17 tungstenite::{handshake::client::generate_key, Message},
18};
19use tracing::{error, info, warn};
20use tycho_common::{
21 models::{protocol::GetAmountOutParams, Chain},
22 simulation::indicatively_priced::SignedQuote,
23 Bytes,
24};
25
26use crate::{
27 rfq::{
28 client::RFQClient,
29 errors::RFQError,
30 models::TimestampHeader,
31 protocols::bebop::models::{
32 BebopOrderToSign, BebopPriceData, BebopPricingUpdate, BebopQuoteResponse,
33 },
34 },
35 tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
36 tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
37};
38
39fn bytes_to_address(address: &Bytes) -> Result<Address, RFQError> {
40 if address.len() == 20 {
41 Ok(Address::from_slice(address))
42 } else {
43 Err(RFQError::InvalidInput(format!("Invalid ERC20 token address: {address:?}")))
44 }
45}
46
47fn chain_to_bebop_url(chain: Chain) -> Result<String, RFQError> {
49 let chain_path = match chain {
50 Chain::Ethereum => "ethereum",
51 Chain::Base => "base",
52 _ => return Err(RFQError::FatalError(format!("Unsupported chain: {chain:?}"))),
53 };
54 let url = format!("api.bebop.xyz/pmm/{chain_path}/v3");
55 Ok(url)
56}
57
58#[derive(Clone, Debug)]
59pub struct BebopClient {
60 chain: Chain,
61 price_ws: String,
62 quote_endpoint: String,
63 tokens: HashSet<Bytes>,
65 tvl: f64,
67 ws_user: String,
69 ws_key: String,
71 quote_tokens: HashSet<Bytes>,
73 quote_timeout: Duration,
74}
75
76impl BebopClient {
77 pub const PROTOCOL_SYSTEM: &'static str = "rfq:bebop";
78
79 pub fn new(
80 chain: Chain,
81 tokens: HashSet<Bytes>,
82 tvl: f64,
83 ws_user: String,
84 ws_key: String,
85 quote_tokens: HashSet<Bytes>,
86 quote_timeout: Duration,
87 ) -> Result<Self, RFQError> {
88 let url = chain_to_bebop_url(chain)?;
89 Ok(Self {
90 price_ws: "wss://".to_string() + &url + "/pricing?format=protobuf",
91 quote_endpoint: "https://".to_string() + &url + "/quote",
92 tokens,
93 chain,
94 tvl,
95 ws_user,
96 ws_key,
97 quote_tokens,
98 quote_timeout,
99 })
100 }
101
102 fn create_component_with_state(
103 &self,
104 component_id: String,
105 tokens: Vec<tycho_common::Bytes>,
106 price_data: &BebopPriceData,
107 tvl: f64,
108 ) -> ComponentWithState {
109 let protocol_component = ProtocolComponent {
110 id: component_id.clone(),
111 protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
112 protocol_type_name: "bebop_pool".to_string(),
113 chain: self.chain.into(),
114 tokens,
115 contract_ids: vec![], static_attributes: Default::default(),
117 change: Default::default(),
118 creation_tx: Default::default(),
119 created_at: Default::default(),
120 };
121
122 let mut attributes = HashMap::new();
123
124 if !price_data.bids.is_empty() {
128 let bids_pairs: Vec<(f32, f32)> = price_data
129 .bids
130 .chunks_exact(2)
131 .map(|chunk| (chunk[0], chunk[1]))
132 .collect();
133 let bids_json = serde_json::to_string(&bids_pairs).unwrap_or_default();
134 attributes.insert("bids".to_string(), bids_json.as_bytes().to_vec().into());
135 }
136 if !price_data.asks.is_empty() {
137 let asks_pairs: Vec<(f32, f32)> = price_data
138 .asks
139 .chunks_exact(2)
140 .map(|chunk| (chunk[0], chunk[1]))
141 .collect();
142 let asks_json = serde_json::to_string(&asks_pairs).unwrap_or_default();
143 attributes.insert("asks".to_string(), asks_json.as_bytes().to_vec().into());
144 }
145
146 ComponentWithState {
147 state: ResponseProtocolState {
148 component_id: component_id.clone(),
149 attributes,
150 balances: HashMap::new(),
151 },
152 component: protocol_component,
153 component_tvl: Some(tvl),
154 entrypoints: vec![],
155 }
156 }
157
158 fn process_quote_response(
159 quote_response: BebopQuoteResponse,
160 params: &GetAmountOutParams,
161 ) -> Result<SignedQuote, RFQError> {
162 match quote_response {
163 BebopQuoteResponse::Success(quote) => {
164 quote.validate(params)?;
165
166 let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
167 quote_attributes.insert("calldata".into(), quote.tx.data);
168 quote_attributes.insert(
169 "partial_fill_offset".into(),
170 Bytes::from(
171 quote
172 .partial_fill_offset
173 .to_be_bytes()
174 .to_vec(),
175 ),
176 );
177 let signed_quote = match quote.to_sign {
178 BebopOrderToSign::Single(ref single) => SignedQuote {
179 base_token: params.token_in.clone(),
180 quote_token: params.token_out.clone(),
181 amount_in: BigUint::from_str(&single.taker_amount).map_err(|_| {
182 RFQError::ParsingError(format!(
183 "Failed to parse amount in string: {}",
184 single.taker_amount
185 ))
186 })?,
187 amount_out: BigUint::from_str(&single.maker_amount).map_err(|_| {
188 RFQError::ParsingError(format!(
189 "Failed to parse amount out string: {}",
190 single.maker_amount
191 ))
192 })?,
193 quote_attributes,
194 },
195 BebopOrderToSign::Aggregate(aggregate) => {
196 let amount_in: BigUint = aggregate
198 .taker_tokens
199 .iter()
200 .zip(&aggregate.taker_amounts)
201 .flat_map(|(tokens, amounts)| {
202 tokens
203 .iter()
204 .zip(amounts)
205 .filter_map(|(token, amount)| {
206 if token == ¶ms.token_in {
207 BigUint::from_str(amount).ok()
208 } else {
209 None
210 }
211 })
212 })
213 .sum();
214
215 let amount_out: BigUint = aggregate
217 .maker_tokens
218 .iter()
219 .zip(&aggregate.maker_amounts)
220 .flat_map(|(tokens, amounts)| {
221 tokens
222 .iter()
223 .zip(amounts)
224 .filter_map(|(token, amount)| {
225 if token == ¶ms.token_out {
226 BigUint::from_str(amount).ok()
227 } else {
228 None
229 }
230 })
231 })
232 .sum();
233
234 SignedQuote {
235 base_token: params.token_in.clone(),
236 quote_token: params.token_out.clone(),
237 amount_in,
238 amount_out,
239 quote_attributes,
240 }
241 }
242 };
243
244 Ok(signed_quote)
245 }
246 BebopQuoteResponse::Error(err) => Err(RFQError::FatalError(format!(
247 "Bebop API error: code {} - {} (requestId: {})",
248 err.error.error_code, err.error.message, err.error.request_id
249 ))),
250 }
251 }
252}
253
254#[async_trait]
255impl RFQClient for BebopClient {
256 fn stream(
257 &self,
258 ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
259 let tokens = self.tokens.clone();
260 let url = self.price_ws.clone();
261 let tvl_threshold = self.tvl;
262 let name = self.ws_user.clone();
263 let authorization = self.ws_key.clone();
264 let client = self.clone();
265
266 Box::pin(async_stream::stream! {
267 let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
268 let mut reconnect_attempts = 0;
269 const MAX_RECONNECT_ATTEMPTS: u32 = 10;
270
271 loop {
272 let request = Request::builder()
273 .method("GET")
274 .uri(&url)
275 .header("Host", "api.bebop.xyz")
276 .header("Upgrade", "websocket")
277 .header("Connection", "Upgrade")
278 .header("Sec-WebSocket-Key", generate_key())
279 .header("Sec-WebSocket-Version", "13")
280 .header("name", &name)
281 .header("Authorization", &authorization)
282 .body(())
283 .map_err(|_| RFQError::FatalError("Failed to build request".into()))?;
284
285 let (ws_stream, _) = match connect_async_with_config(request, None, false).await {
287 Ok(connection) => {
288 info!("Successfully connected to Bebop WebSocket");
289 reconnect_attempts = 0; connection
291 },
292 Err(e) => {
293 reconnect_attempts += 1;
294 error!("Failed to connect to Bebop WebSocket (attempt {}): {}", reconnect_attempts, e);
295
296 if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
297 yield Err(RFQError::ConnectionError(format!("Failed to connect after {MAX_RECONNECT_ATTEMPTS} attempts: {e}")));
298 return;
299 }
300
301 let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
302 info!("Retrying connection in {} seconds...", backoff_duration.as_secs());
303 sleep(backoff_duration).await;
304 continue;
305 }
306 };
307
308 let (_, mut ws_receiver) = ws_stream.split();
309
310 while let Some(msg) = ws_receiver.next().await {
312 match msg {
313 Ok(Message::Binary(data)) => {
314 match BebopPricingUpdate::decode(&data[..]) {
315 Ok(protobuf_update) => {
316 let mut new_components = HashMap::new();
317
318 for price_data in &protobuf_update.pairs {
320 let base_bytes = Bytes::from(price_data.base.clone());
321 let quote_bytes = Bytes::from(price_data.quote.clone());
322 if tokens.contains(&base_bytes) && tokens.contains("e_bytes) {
323 let pair_tokens = vec![
324 base_bytes.clone(), quote_bytes.clone()
325 ];
326
327 let mut quote_price_data: Option<&BebopPriceData> = None;
328 if !client.quote_tokens.contains("e_bytes) {
331 for approved_quote_token in &client.quote_tokens {
332 if let Some(quote_data) = protobuf_update.pairs.iter()
335 .find(|p| {
336 (p.base == quote_bytes.as_ref() && p.quote == approved_quote_token.as_ref()) ||
337 (p.quote == quote_bytes.as_ref() && p.base == approved_quote_token.as_ref())
338 }) {
339 quote_price_data = Some(quote_data);
340 break;
341 }
342 }
343
344 if quote_price_data.is_none() {
347 warn!("Quote token {} does not have price levels in approved quote token. Skipping.", hex::encode("e_bytes));
348 continue;
349 }
350 }
351
352 let tvl = price_data.calculate_tvl(quote_price_data);
353 if tvl < tvl_threshold {
354 continue;
355 }
356
357 let pair_str = format!("bebop_{}/{}", hex::encode(&base_bytes), hex::encode("e_bytes));
358 let component_id = format!("{}", keccak256(pair_str.as_bytes()));
359 let component_with_state = client.create_component_with_state(
360 component_id.clone(),
361 pair_tokens,
362 price_data,
363 tvl
364 );
365 new_components.insert(component_id, component_with_state);
366 }
367 }
368
369 let removed_components: HashMap<String, ProtocolComponent> = current_components
373 .iter()
374 .filter(|&(id, _)| !new_components.contains_key(id))
375 .map(|(k, v)| (k.clone(), v.component.clone()))
376 .collect();
377
378 current_components = new_components.clone();
380
381 let snapshot = Snapshot {
382 states: new_components,
383 vm_storage: HashMap::new(),
384 };
385 let timestamp = SystemTime::now().duration_since(
386 SystemTime::UNIX_EPOCH
387 ).map_err(
388 |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
389 )?.as_secs();
390
391 let msg = StateSyncMessage::<TimestampHeader> {
392 header: TimestampHeader { timestamp },
393 snapshots: snapshot,
394 deltas: None, removed_components,
396 };
397
398 yield Ok(("bebop".to_string(), msg));
400 },
401 Err(e) => {
402 error!("Failed to parse protobuf message: {}", e);
403 break;
404 }
405 }
406 }
407 Ok(Message::Close(_)) => {
408 info!("WebSocket connection closed by server");
409 break;
410 }
411 Err(e) => {
412 error!("WebSocket error: {}", e);
413 break;
414 }
415 _ => {} }
417 }
418
419 reconnect_attempts += 1;
421 if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
422 yield Err(RFQError::ConnectionError(format!("Connection failed after {MAX_RECONNECT_ATTEMPTS} attempts")));
423 return;
424 }
425
426 let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
427 info!("Reconnecting in {} seconds (attempt {})...", backoff_duration.as_secs(), reconnect_attempts);
428 sleep(backoff_duration).await;
429 }
431 })
432 }
433
434 async fn request_binding_quote(
435 &self,
436 params: &GetAmountOutParams,
437 ) -> Result<SignedQuote, RFQError> {
438 let sell_token = bytes_to_address(¶ms.token_in)?.to_string();
439 let buy_token = bytes_to_address(¶ms.token_out)?.to_string();
440 let sell_amount = params.amount_in.to_string();
441 let sender = bytes_to_address(¶ms.sender)?.to_string();
442 let receiver = bytes_to_address(¶ms.receiver)?.to_string();
443
444 let url = self.quote_endpoint.clone();
445
446 let client = Client::new();
447
448 let start_time = std::time::Instant::now();
449 const MAX_RETRIES: u32 = 3;
450 let mut last_error = None;
451
452 for attempt in 0..MAX_RETRIES {
453 let elapsed = start_time.elapsed();
455 if elapsed >= self.quote_timeout {
456 return Err(last_error.unwrap_or_else(|| {
457 RFQError::ConnectionError(format!(
458 "Bebop quote request timed out after {} seconds",
459 self.quote_timeout.as_secs()
460 ))
461 }));
462 }
463
464 let remaining_time = self.quote_timeout - elapsed;
465
466 let request = client
467 .get(&url)
468 .query(&[
469 ("sell_tokens", sell_token.clone()),
470 ("buy_tokens", buy_token.clone()),
471 ("sell_amounts", sell_amount.clone()),
472 ("taker_address", sender.clone()),
473 ("receiver_address", receiver.clone()),
474 ("approval_type", "Standard".into()),
475 ("skip_validation", "true".into()),
476 ("skip_taker_checks", "true".into()),
477 ("gasless", "false".into()),
478 ("expiry_type", "standard".into()),
479 ("fee", "0".into()),
480 ("is_ui", "false".into()),
481 ("source", self.ws_user.clone()),
482 ])
483 .header("accept", "application/json")
484 .header("name", &self.ws_user)
485 .header("source-auth", &self.ws_key)
486 .header("Authorization", &self.ws_key);
487
488 let response = match timeout(remaining_time, request.send()).await {
489 Ok(Ok(resp)) => resp,
490 Ok(Err(e)) => {
491 warn!(
492 "Bebop quote request failed (attempt {}/{}): {}",
493 attempt + 1,
494 MAX_RETRIES,
495 e
496 );
497 last_error = Some(RFQError::ConnectionError(format!(
498 "Failed to send Bebop quote request: {e}"
499 )));
500 if attempt < MAX_RETRIES - 1 {
501 continue;
502 } else {
503 return Err(last_error.unwrap());
504 }
505 }
506 Err(_) => {
507 return Err(RFQError::ConnectionError(format!(
508 "Bebop quote request timed out after {} seconds",
509 self.quote_timeout.as_secs()
510 )));
511 }
512 };
513
514 let quote_response = match response
515 .json::<BebopQuoteResponse>()
516 .await
517 {
518 Ok(resp) => resp,
519 Err(e) => {
520 warn!(
521 "Bebop quote response parsing failed (attempt {}/{}): {}",
522 attempt + 1,
523 MAX_RETRIES,
524 e
525 );
526 last_error = Some(RFQError::ParsingError(format!(
527 "Failed to parse Bebop quote response: {e}"
528 )));
529 if attempt < MAX_RETRIES - 1 {
530 sleep(Duration::from_millis(100)).await;
531 continue;
532 } else {
533 return Err(last_error.unwrap());
534 }
535 }
536 };
537
538 return Self::process_quote_response(quote_response, params);
539 }
540
541 Err(last_error.unwrap_or_else(|| {
542 RFQError::ConnectionError("Bebop quote request failed after retries".to_string())
543 }))
544 }
545}
546
547#[cfg(test)]
548mod tests {
549 use std::{
550 sync::{Arc, Mutex},
551 time::Duration,
552 };
553
554 use dotenv::dotenv;
555 use futures::SinkExt;
556 use tokio::{net::TcpListener, time::timeout};
557 use tokio_tungstenite::accept_async;
558
559 use super::*;
560 use crate::rfq::constants::get_bebop_auth;
561
562 #[tokio::test]
563 #[ignore] async fn test_bebop_websocket_connection() {
565 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
568 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
569
570 dotenv().expect("Missing .env file");
571 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
572
573 let quote_tokens = HashSet::from([
574 Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(), Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), ]);
578
579 let client = BebopClient::new(
580 Chain::Ethereum,
581 HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
582 10.0, auth.user,
584 auth.key,
585 quote_tokens,
586 Duration::from_secs(30),
587 )
588 .unwrap();
589
590 let mut stream = client.stream();
591
592 let result = timeout(Duration::from_secs(10), async {
594 let mut message_count = 0;
595 let max_messages = 5;
596
597 while let Some(result) = stream.next().await {
598 match result {
599 Ok((component_id, msg)) => {
600 println!("Received message with ID: {component_id}");
601
602 assert!(!component_id.is_empty());
603 assert_eq!(component_id, "bebop");
604 assert!(msg.header.timestamp > 0);
605 assert!(!msg.snapshots.states.is_empty());
606
607 let snapshot = &msg.snapshots;
608
609 assert!(!snapshot.states.is_empty());
611
612 println!("Received {} components in this message", snapshot.states.len());
613 for (id, component_with_state) in &snapshot.states {
614 assert_eq!(
615 component_with_state
616 .component
617 .protocol_system,
618 "rfq:bebop"
619 );
620 assert_eq!(
621 component_with_state
622 .component
623 .protocol_type_name,
624 "bebop_pool"
625 );
626 assert_eq!(
627 component_with_state.component.chain,
628 Chain::Ethereum.into()
629 );
630
631 let attributes = &component_with_state.state.attributes;
632
633 assert!(attributes.contains_key("bids"));
635 assert!(attributes.contains_key("asks"));
636 assert!(!attributes["bids"].is_empty());
637 assert!(!attributes["asks"].is_empty());
638
639 if let Some(tvl) = component_with_state.component_tvl {
640 assert!(tvl >= 0.0);
641 println!("Component {id} TVL: ${tvl:.2}");
642 }
643 }
644
645 message_count += 1;
646 if message_count >= max_messages {
647 break;
648 }
649 }
650 Err(e) => {
651 panic!("Stream error: {e}");
652 }
653 }
654 }
655
656 assert!(message_count > 0, "Should have received at least one message");
657 println!("Successfully received {message_count} messages");
658 })
659 .await;
660
661 match result {
662 Ok(_) => println!("Test completed successfully"),
663 Err(_) => panic!("Test timed out - no messages received within 10 seconds"),
664 }
665 }
666
667 #[tokio::test]
668 async fn test_websocket_reconnection() {
669 let listener = TcpListener::bind("127.0.0.1:0")
671 .await
672 .unwrap();
673 let addr = listener.local_addr().unwrap();
674
675 let connection_count = Arc::new(Mutex::new(0u32));
677
678 let connection_count_clone = connection_count.clone();
680
681 tokio::spawn(async move {
682 while let Ok((stream, _)) = listener.accept().await {
683 *connection_count_clone.lock().unwrap() += 1;
684 let count = *connection_count_clone.lock().unwrap();
685 println!("Mock server: Connection #{count} established");
686
687 tokio::spawn(async move {
688 if let Ok(ws_stream) = accept_async(stream).await {
689 let (mut ws_sender, _ws_receiver) = ws_stream.split();
690
691 let weth_addr =
693 hex::decode("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
694 let usdc_addr =
695 hex::decode("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
696
697 let test_price_data = BebopPriceData {
698 base: weth_addr,
699 quote: usdc_addr,
700 last_update_ts: 1752617378,
701 bids: vec![3070.05f32, 0.325717f32],
702 asks: vec![3070.527f32, 0.325717f32],
703 };
704
705 let pricing_update = BebopPricingUpdate { pairs: vec![test_price_data] };
706
707 let test_message = pricing_update.encode_to_vec();
708
709 if count == 1 {
710 println!("Mock server: Connection #1 - sending message then dropping.");
712 let _ = ws_sender
713 .send(Message::Binary(test_message.clone().into()))
714 .await;
715
716 tokio::time::sleep(Duration::from_millis(100)).await;
718 println!("Mock server: Dropping connection #1");
719 let _ = ws_sender.close().await;
720 } else if count == 2 {
721 println!("Mock server: Connection #2 - maintaining stable connection.");
723 let _ = ws_sender
724 .send(Message::Binary(test_message.clone().into()))
725 .await;
726 }
727 }
728 });
729 }
730 });
731
732 tokio::time::sleep(Duration::from_millis(50)).await;
734
735 let mut test_quote_tokens = HashSet::new();
736 test_quote_tokens
737 .insert(Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap());
738
739 let tokens_formatted = vec![
740 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap(),
741 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
742 ];
743
744 let client = BebopClient {
746 chain: Chain::Ethereum,
747 price_ws: format!("ws://127.0.0.1:{}", addr.port()),
748 tokens: tokens_formatted.into_iter().collect(),
749 tvl: 1000.0,
750 ws_user: "test_user".to_string(),
751 ws_key: "test_key".to_string(),
752 quote_tokens: test_quote_tokens,
753 quote_endpoint: "".to_string(),
754 quote_timeout: Duration::from_secs(5),
755 };
756
757 let start_time = std::time::Instant::now();
758 let mut successful_messages = 0;
759 let mut connection_errors = 0;
760 let mut first_message_received = false;
761 let mut second_message_received = false;
762
763 while start_time.elapsed() < Duration::from_secs(5) && successful_messages < 2 {
770 match timeout(Duration::from_millis(1000), client.stream().next()).await {
771 Ok(Some(result)) => match result {
772 Ok((_component_id, _message)) => {
773 successful_messages += 1;
774 println!("Received successful message {successful_messages}");
775
776 if successful_messages == 1 {
777 first_message_received = true;
778 println!("First message received - connection should drop after this.");
779 } else if successful_messages == 2 {
780 second_message_received = true;
781 println!("Second message received after reconnection.");
782 }
783 }
784 Err(e) => {
785 connection_errors += 1;
786 println!("Connection error during reconnection: {e:?}");
787 }
788 },
789 Ok(None) => {
790 panic!("Stream ended unexpectedly");
791 }
792 Err(_) => {
793 println!("Timeout waiting for message (normal during reconnections)");
794 continue;
795 }
796 }
797 }
798
799 let final_connection_count = *connection_count.lock().unwrap();
800
801 assert_eq!(final_connection_count, 2);
805 assert!(first_message_received);
806 assert!(second_message_received);
807 assert_eq!(connection_errors, 0);
808 assert_eq!(successful_messages, 2);
809 }
810
811 #[tokio::test]
812 #[ignore] async fn test_bebop_quote_single_order() {
814 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
815 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
816 dotenv().expect("Missing .env file");
817 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
818
819 let client = BebopClient::new(
820 Chain::Ethereum,
821 HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
822 10.0, auth.user,
824 auth.key,
825 HashSet::new(),
826 Duration::from_secs(30),
827 )
828 .unwrap();
829
830 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
831
832 let params = GetAmountOutParams {
833 amount_in: BigUint::from(1_000000000000000000u64),
834 token_in: token_in.clone(),
835 token_out: token_out.clone(),
836 sender: router.clone(),
837 receiver: router,
838 };
839 let quote = client
840 .request_binding_quote(¶ms)
841 .await
842 .unwrap();
843
844 assert_eq!(quote.base_token, token_in);
845 assert_eq!(quote.quote_token, token_out);
846 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
847
848 assert!(quote.amount_out > BigUint::from(3000000u64));
850
851 assert_eq!(
853 quote
854 .quote_attributes
855 .get("calldata")
856 .unwrap()[..4],
857 Bytes::from_str("0x4dcebcba")
858 .unwrap()
859 .to_vec()
860 );
861 let partial_fill_offset_slice = quote
862 .quote_attributes
863 .get("partial_fill_offset")
864 .unwrap()
865 .as_ref();
866 let mut partial_fill_offset_array = [0u8; 8];
867 partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
868
869 assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 12);
870 }
871
872 #[tokio::test]
873 #[ignore] async fn test_bebop_quote_aggregate_order() {
875 let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
878 let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
879 dotenv().expect("Missing .env file");
880 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
881
882 let client = BebopClient::new(
883 Chain::Ethereum,
884 HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
885 10.0, auth.user,
887 auth.key,
888 HashSet::new(),
889 Duration::from_secs(30),
890 )
891 .unwrap();
892
893 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
894
895 let amount_in = BigUint::from_str("20_000_000_000").unwrap(); let params = GetAmountOutParams {
897 amount_in: amount_in.clone(),
898 token_in: token_in.clone(),
899 token_out: token_out.clone(),
900 sender: router.clone(),
901 receiver: router,
902 };
903 let quote = client
904 .request_binding_quote(¶ms)
905 .await
906 .unwrap();
907
908 assert_eq!(quote.base_token, token_in);
909 assert_eq!(quote.quote_token, token_out);
910 assert_eq!(quote.amount_in, amount_in);
911
912 assert!(quote.amount_out > BigUint::from_str("18000000000000000000000").unwrap()); assert_eq!(
917 quote
918 .quote_attributes
919 .get("calldata")
920 .unwrap()[..4],
921 Bytes::from_str("0xa2f74893")
922 .unwrap()
923 .to_vec()
924 );
925 let partial_fill_offset_slice = quote
926 .quote_attributes
927 .get("partial_fill_offset")
928 .unwrap()
929 .as_ref();
930 let mut partial_fill_offset_array = [0u8; 8];
931 partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
932
933 assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 2);
936 }
937
938 #[test]
939 fn test_process_bebop_quote_response_aggregate_order() {
940 let json =
941 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
942 .unwrap();
943 let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
944 let params = GetAmountOutParams {
945 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
946 token_in: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
947 token_out: Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap(),
948 sender: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
949 receiver: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
950 };
951 let res = BebopClient::process_quote_response(quote_response, ¶ms).unwrap();
952 assert_eq!(res.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
953 assert_eq!(res.amount_in, BigUint::from_str("20000000000").unwrap());
954 assert_eq!(res.base_token, params.token_in);
955 assert_eq!(res.quote_token, params.token_out);
956 }
957
958 #[test]
959 fn test_process_bebop_quote_response_aggregate_order_with_multihop() {
960 let json = std::fs::read_to_string(
961 "src/rfq/protocols/bebop/test_responses/aggregate_order_with_multihop.json",
962 )
963 .unwrap();
964 let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
965 let params = GetAmountOutParams {
966 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
967 token_in: Bytes::from_str("0xDEf1CA1fb7FBcDC777520aa7f396b4E015F497aB").unwrap(),
968 token_out: Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(),
969 sender: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
970 receiver: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
971 };
972 let res = BebopClient::process_quote_response(quote_response, ¶ms).unwrap();
973 assert_eq!(res.amount_out, BigUint::from_str("11186653890").unwrap());
974 assert_eq!(res.amount_in, BigUint::from_str("43067495979235520920162").unwrap());
975 assert_eq!(res.base_token, params.token_in);
976 assert_eq!(res.quote_token, params.token_out);
977 }
978
979 async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
981 use tokio::io::AsyncWriteExt;
982
983 let listener = TcpListener::bind("127.0.0.1:0")
984 .await
985 .unwrap();
986 let addr = listener.local_addr().unwrap();
987
988 let json_response =
989 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
990 .unwrap();
991
992 tokio::spawn(async move {
993 while let Ok((mut stream, _)) = listener.accept().await {
994 let json_response_clone = json_response.clone();
995 tokio::spawn(async move {
996 sleep(Duration::from_millis(delay_ms)).await;
997
998 let response = format!(
999 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1000 json_response_clone.len(),
1001 json_response_clone
1002 );
1003 let _ = stream
1004 .write_all(response.as_bytes())
1005 .await;
1006 let _ = stream.flush().await;
1007 let _ = stream.shutdown().await;
1008 });
1009 }
1010 });
1011
1012 addr
1013 }
1014
1015 fn create_test_bebop_client(quote_endpoint: String, quote_timeout: Duration) -> BebopClient {
1016 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1017 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1018
1019 BebopClient {
1020 chain: Chain::Ethereum,
1021 price_ws: "ws://example.com".to_string(),
1022 quote_endpoint,
1023 tokens: HashSet::from([token_in, token_out]),
1024 tvl: 10.0,
1025 ws_user: "test_user".to_string(),
1026 ws_key: "test_key".to_string(),
1027 quote_tokens: HashSet::new(),
1028 quote_timeout,
1029 }
1030 }
1031
1032 fn create_test_quote_params() -> GetAmountOutParams {
1034 let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1035 let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
1036 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
1037
1038 GetAmountOutParams {
1039 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
1040 token_in,
1041 token_out,
1042 sender: router.clone(),
1043 receiver: router,
1044 }
1045 }
1046
1047 #[tokio::test]
1048 async fn test_bebop_quote_timeout() {
1049 let addr = create_delayed_response_server(500).await;
1050
1051 let client_short_timeout = create_test_bebop_client(
1053 format!("http://127.0.0.1:{}/quote", addr.port()),
1054 Duration::from_millis(200),
1055 );
1056 let params = create_test_quote_params();
1057
1058 let start = std::time::Instant::now();
1059 let result = client_short_timeout
1060 .request_binding_quote(¶ms)
1061 .await;
1062 let elapsed = start.elapsed();
1063
1064 assert!(result.is_err());
1065 let err = result.unwrap_err();
1066 match err {
1067 RFQError::ConnectionError(msg) => {
1068 assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
1069 }
1070 _ => panic!("Expected ConnectionError, got: {:?}", err),
1071 }
1072 assert!(
1073 elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
1074 "Expected timeout around 200ms, got: {:?}",
1075 elapsed
1076 );
1077
1078 let client_long_timeout = create_test_bebop_client(
1082 format!("http://127.0.0.1:{}/quote", addr.port()),
1083 Duration::from_secs(1),
1084 );
1085
1086 let result = client_long_timeout
1087 .request_binding_quote(¶ms)
1088 .await;
1089
1090 assert!(result.is_ok(), "Expected success, got: {:?}", result);
1092 let quote = result.unwrap();
1093
1094 assert_eq!(quote.base_token, params.token_in);
1096 assert_eq!(quote.quote_token, params.token_out);
1097 }
1098
1099 async fn create_retry_server() -> (std::net::SocketAddr, Arc<Mutex<u32>>) {
1102 use std::sync::{Arc, Mutex};
1103
1104 use tokio::io::AsyncWriteExt;
1105
1106 let request_count = Arc::new(Mutex::new(0u32));
1107 let request_count_clone = request_count.clone();
1108
1109 let listener = TcpListener::bind("127.0.0.1:0")
1110 .await
1111 .unwrap();
1112 let addr = listener.local_addr().unwrap();
1113
1114 let json_response =
1115 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
1116 .unwrap();
1117
1118 tokio::spawn(async move {
1119 while let Ok((mut stream, _)) = listener.accept().await {
1120 let count_clone = request_count_clone.clone();
1121 let json_response_clone = json_response.clone();
1122 tokio::spawn(async move {
1123 *count_clone.lock().unwrap() += 1;
1124 let count = *count_clone.lock().unwrap();
1125 println!("Mock server: Received request #{count}");
1126
1127 if count <= 2 {
1128 let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1129 let _ = stream
1130 .write_all(response.as_bytes())
1131 .await;
1132 } else {
1133 let response = format!(
1134 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1135 json_response_clone.len(),
1136 json_response_clone
1137 );
1138 let _ = stream
1139 .write_all(response.as_bytes())
1140 .await;
1141 }
1142 let _ = stream.flush().await;
1143 let _ = stream.shutdown().await;
1144 });
1145 }
1146 });
1147 (addr, request_count)
1148 }
1149
1150 #[tokio::test]
1151 async fn test_bebop_quote_retry_on_bad_response() {
1152 let (addr, request_count) = create_retry_server().await;
1153
1154 let client = create_test_bebop_client(
1155 format!("http://127.0.0.1:{}/quote", addr.port()),
1156 Duration::from_secs(5),
1157 );
1158 let params = create_test_quote_params();
1159 let result = client
1160 .request_binding_quote(¶ms)
1161 .await;
1162
1163 assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1164 let quote = result.unwrap();
1165
1166 assert_eq!(quote.amount_in, BigUint::from_str("20000000000").unwrap());
1168 assert_eq!(quote.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
1169
1170 let final_count = *request_count.lock().unwrap();
1172 assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1173 }
1174}