1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address};
8use async_trait::async_trait;
9use futures::{stream::BoxStream, StreamExt};
10use http::Request;
11use num_bigint::BigUint;
12use prost::Message as ProstMessage;
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use tokio::time::{sleep, timeout, Duration};
16use tokio_tungstenite::{
17 connect_async_with_config,
18 tungstenite::{handshake::client::generate_key, Message},
19};
20use tracing::{error, info, warn};
21use tycho_common::{
22 models::{protocol::GetAmountOutParams, Chain},
23 simulation::indicatively_priced::SignedQuote,
24 Bytes,
25};
26
27use crate::{
28 rfq::{
29 client::RFQClient,
30 errors::RFQError,
31 models::TimestampHeader,
32 protocols::bebop::models::{
33 BebopOrderToSign, BebopPriceData, BebopPricingUpdate, BebopQuoteResponse,
34 },
35 },
36 tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
37 tycho_common::models::protocol::{ProtocolComponent, ProtocolComponentState},
38};
39
40fn bytes_to_address(address: &Bytes) -> Result<Address, RFQError> {
41 if address.len() == 20 {
42 Ok(Address::from_slice(address))
43 } else {
44 Err(RFQError::InvalidInput(format!("Invalid ERC20 token address: {address:?}")))
45 }
46}
47
48fn chain_to_bebop_url(chain: Chain) -> Result<String, RFQError> {
50 let chain_path = match chain {
51 Chain::Ethereum => "ethereum",
52 Chain::Base => "base",
53 _ => return Err(RFQError::FatalError(format!("Unsupported chain: {chain:?}"))),
54 };
55 let url = format!("api.bebop.xyz/pmm/{chain_path}/v3");
56 Ok(url)
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize)]
60pub struct BebopClient {
61 chain: Chain,
62 price_ws: String,
63 quote_endpoint: String,
64 tokens: HashSet<Bytes>,
66 tvl: f64,
68 #[serde(skip_serializing, default)]
70 ws_user: String,
71 #[serde(skip_serializing, default)]
73 ws_key: String,
74 quote_tokens: HashSet<Bytes>,
76 quote_timeout: Duration,
77}
78
79impl BebopClient {
80 pub const PROTOCOL_SYSTEM: &'static str = "rfq:bebop";
81
82 pub fn new(
83 chain: Chain,
84 tokens: HashSet<Bytes>,
85 tvl: f64,
86 ws_user: String,
87 ws_key: String,
88 quote_tokens: HashSet<Bytes>,
89 quote_timeout: Duration,
90 ) -> Result<Self, RFQError> {
91 let url = chain_to_bebop_url(chain)?;
92 Ok(Self {
93 price_ws: "wss://".to_string() + &url + "/pricing?format=protobuf",
94 quote_endpoint: "https://".to_string() + &url + "/quote",
95 tokens,
96 chain,
97 tvl,
98 ws_user,
99 ws_key,
100 quote_tokens,
101 quote_timeout,
102 })
103 }
104
105 fn create_component_with_state(
106 &self,
107 component_id: String,
108 tokens: Vec<tycho_common::Bytes>,
109 price_data: &BebopPriceData,
110 tvl: f64,
111 ) -> ComponentWithState {
112 let protocol_component = ProtocolComponent {
113 id: component_id.clone(),
114 protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
115 protocol_type_name: "bebop_pool".to_string(),
116 chain: self.chain,
117 tokens,
118 contract_addresses: vec![], static_attributes: Default::default(),
120 change: Default::default(),
121 creation_tx: Default::default(),
122 created_at: Default::default(),
123 };
124
125 let mut attributes = HashMap::new();
126
127 if !price_data.bids.is_empty() {
131 let bids_pairs: Vec<(f32, f32)> = price_data
132 .bids
133 .chunks_exact(2)
134 .map(|chunk| (chunk[0], chunk[1]))
135 .collect();
136 let bids_json = serde_json::to_string(&bids_pairs).unwrap_or_default();
137 attributes.insert("bids".to_string(), bids_json.as_bytes().to_vec().into());
138 }
139 if !price_data.asks.is_empty() {
140 let asks_pairs: Vec<(f32, f32)> = price_data
141 .asks
142 .chunks_exact(2)
143 .map(|chunk| (chunk[0], chunk[1]))
144 .collect();
145 let asks_json = serde_json::to_string(&asks_pairs).unwrap_or_default();
146 attributes.insert("asks".to_string(), asks_json.as_bytes().to_vec().into());
147 }
148
149 ComponentWithState {
150 state: ProtocolComponentState::new(&component_id, attributes, HashMap::new()),
151 component: protocol_component,
152 component_tvl: Some(tvl),
153 entrypoints: vec![],
154 }
155 }
156
157 fn process_quote_response(
158 quote_response: BebopQuoteResponse,
159 params: &GetAmountOutParams,
160 ) -> Result<SignedQuote, RFQError> {
161 match quote_response {
162 BebopQuoteResponse::Success(quote) => {
163 quote.validate(params)?;
164
165 let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
166 quote_attributes.insert("calldata".into(), quote.tx.data);
167 quote_attributes.insert(
168 "partial_fill_offset".into(),
169 Bytes::from(
170 quote
171 .partial_fill_offset
172 .to_be_bytes()
173 .to_vec(),
174 ),
175 );
176 let signed_quote = match quote.to_sign {
177 BebopOrderToSign::Single(ref single) => SignedQuote {
178 base_token: params.token_in.clone(),
179 quote_token: params.token_out.clone(),
180 amount_in: BigUint::from_str(&single.taker_amount).map_err(|_| {
181 RFQError::ParsingError(format!(
182 "Failed to parse amount in string: {}",
183 single.taker_amount
184 ))
185 })?,
186 amount_out: BigUint::from_str(&single.maker_amount).map_err(|_| {
187 RFQError::ParsingError(format!(
188 "Failed to parse amount out string: {}",
189 single.maker_amount
190 ))
191 })?,
192 quote_attributes,
193 },
194 BebopOrderToSign::Aggregate(aggregate) => {
195 let amount_in: BigUint = aggregate
197 .taker_tokens
198 .iter()
199 .zip(&aggregate.taker_amounts)
200 .flat_map(|(tokens, amounts)| {
201 tokens
202 .iter()
203 .zip(amounts)
204 .filter_map(|(token, amount)| {
205 if token == ¶ms.token_in {
206 BigUint::from_str(amount).ok()
207 } else {
208 None
209 }
210 })
211 })
212 .sum();
213
214 let amount_out: BigUint = aggregate
216 .maker_tokens
217 .iter()
218 .zip(&aggregate.maker_amounts)
219 .flat_map(|(tokens, amounts)| {
220 tokens
221 .iter()
222 .zip(amounts)
223 .filter_map(|(token, amount)| {
224 if token == ¶ms.token_out {
225 BigUint::from_str(amount).ok()
226 } else {
227 None
228 }
229 })
230 })
231 .sum();
232
233 SignedQuote {
234 base_token: params.token_in.clone(),
235 quote_token: params.token_out.clone(),
236 amount_in,
237 amount_out,
238 quote_attributes,
239 }
240 }
241 };
242
243 Ok(signed_quote)
244 }
245 BebopQuoteResponse::Error(err) => Err(RFQError::FatalError(format!(
246 "Bebop API error: code {} - {} (requestId: {})",
247 err.error.error_code, err.error.message, err.error.request_id
248 ))),
249 }
250 }
251}
252
253#[async_trait]
254impl RFQClient for BebopClient {
255 fn stream(
256 &self,
257 ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
258 let tokens = self.tokens.clone();
259 let url = self.price_ws.clone();
260 let tvl_threshold = self.tvl;
261 let name = self.ws_user.clone();
262 let authorization = self.ws_key.clone();
263 let client = self.clone();
264
265 Box::pin(async_stream::stream! {
266 let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
267 let mut reconnect_attempts = 0;
268 const MAX_RECONNECT_ATTEMPTS: u32 = 10;
269
270 loop {
271 let request = Request::builder()
272 .method("GET")
273 .uri(&url)
274 .header("Host", "api.bebop.xyz")
275 .header("Upgrade", "websocket")
276 .header("Connection", "Upgrade")
277 .header("Sec-WebSocket-Key", generate_key())
278 .header("Sec-WebSocket-Version", "13")
279 .header("name", &name)
280 .header("Authorization", &authorization)
281 .body(())
282 .map_err(|_| RFQError::FatalError("Failed to build request".into()))?;
283
284 let (ws_stream, _) = match connect_async_with_config(request, None, false).await {
286 Ok(connection) => {
287 info!("Successfully connected to Bebop WebSocket");
288 reconnect_attempts = 0; connection
290 },
291 Err(e) => {
292 reconnect_attempts += 1;
293 error!("Failed to connect to Bebop WebSocket (attempt {}): {}", reconnect_attempts, e);
294
295 if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
296 yield Err(RFQError::ConnectionError(format!("Failed to connect after {MAX_RECONNECT_ATTEMPTS} attempts: {e}")));
297 return;
298 }
299
300 let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
301 info!("Retrying connection in {} seconds...", backoff_duration.as_secs());
302 sleep(backoff_duration).await;
303 continue;
304 }
305 };
306
307 let (_, mut ws_receiver) = ws_stream.split();
308
309 while let Some(msg) = ws_receiver.next().await {
311 match msg {
312 Ok(Message::Binary(data)) => {
313 match BebopPricingUpdate::decode(&data[..]) {
314 Ok(protobuf_update) => {
315 let mut new_components = HashMap::new();
316
317 for price_data in &protobuf_update.pairs {
319 let base_bytes = Bytes::from(price_data.base.clone());
320 let quote_bytes = Bytes::from(price_data.quote.clone());
321 if tokens.contains(&base_bytes) && tokens.contains("e_bytes) {
322 let pair_tokens = vec![
323 base_bytes.clone(), quote_bytes.clone()
324 ];
325
326 let mut quote_price_data: Option<&BebopPriceData> = None;
327 if !client.quote_tokens.contains("e_bytes) {
330 for approved_quote_token in &client.quote_tokens {
331 if let Some(quote_data) = protobuf_update.pairs.iter()
334 .find(|p| {
335 (p.base == quote_bytes.as_ref() && p.quote == approved_quote_token.as_ref()) ||
336 (p.quote == quote_bytes.as_ref() && p.base == approved_quote_token.as_ref())
337 }) {
338 quote_price_data = Some(quote_data);
339 break;
340 }
341 }
342
343 if quote_price_data.is_none() {
346 warn!("Quote token {} does not have price levels in approved quote token. Skipping.", hex::encode("e_bytes));
347 continue;
348 }
349 }
350
351 let tvl = price_data.calculate_tvl(quote_price_data);
352 if tvl < tvl_threshold {
353 continue;
354 }
355
356 let pair_str = format!("bebop_{}/{}", hex::encode(&base_bytes), hex::encode("e_bytes));
357 let component_id = format!("{}", keccak256(pair_str.as_bytes()));
358 let component_with_state = client.create_component_with_state(
359 component_id.clone(),
360 pair_tokens,
361 price_data,
362 tvl
363 );
364 new_components.insert(component_id, component_with_state);
365 }
366 }
367
368 let removed_components: HashMap<String, ProtocolComponent> = current_components
372 .iter()
373 .filter(|&(id, _)| !new_components.contains_key(id))
374 .map(|(k, v)| (k.clone(), v.component.clone()))
375 .collect();
376
377 current_components = new_components.clone();
379
380 let snapshot = Snapshot {
381 states: new_components,
382 vm_storage: HashMap::new(),
383 };
384 let timestamp = SystemTime::now().duration_since(
385 SystemTime::UNIX_EPOCH
386 ).map_err(
387 |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
388 )?.as_secs();
389
390 let msg = StateSyncMessage::<TimestampHeader> {
391 header: TimestampHeader { timestamp },
392 snapshots: snapshot,
393 deltas: None, removed_components,
395 };
396
397 yield Ok(("bebop".to_string(), msg));
399 },
400 Err(e) => {
401 error!("Failed to parse protobuf message: {}", e);
402 break;
403 }
404 }
405 }
406 Ok(Message::Close(_)) => {
407 info!("WebSocket connection closed by server");
408 break;
409 }
410 Err(e) => {
411 error!("WebSocket error: {}", e);
412 break;
413 }
414 _ => {} }
416 }
417
418 reconnect_attempts += 1;
420 if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
421 yield Err(RFQError::ConnectionError(format!("Connection failed after {MAX_RECONNECT_ATTEMPTS} attempts")));
422 return;
423 }
424
425 let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
426 info!("Reconnecting in {} seconds (attempt {})...", backoff_duration.as_secs(), reconnect_attempts);
427 sleep(backoff_duration).await;
428 }
430 })
431 }
432
433 async fn request_binding_quote(
434 &self,
435 params: &GetAmountOutParams,
436 ) -> Result<SignedQuote, RFQError> {
437 let sell_token = bytes_to_address(¶ms.token_in)?.to_string();
438 let buy_token = bytes_to_address(¶ms.token_out)?.to_string();
439 let sell_amount = params.amount_in.to_string();
440 let sender = bytes_to_address(¶ms.sender)?.to_string();
441 let receiver = bytes_to_address(¶ms.receiver)?.to_string();
442
443 let url = self.quote_endpoint.clone();
444
445 let client = Client::new();
446
447 let start_time = std::time::Instant::now();
448 const MAX_RETRIES: u32 = 3;
449 let mut last_error = None;
450
451 for attempt in 0..MAX_RETRIES {
452 let elapsed = start_time.elapsed();
454 if elapsed >= self.quote_timeout {
455 return Err(last_error.unwrap_or_else(|| {
456 RFQError::ConnectionError(format!(
457 "Bebop quote request timed out after {} seconds",
458 self.quote_timeout.as_secs()
459 ))
460 }));
461 }
462
463 let remaining_time = self.quote_timeout - elapsed;
464
465 let request = client
466 .get(&url)
467 .query(&[
468 ("sell_tokens", sell_token.clone()),
469 ("buy_tokens", buy_token.clone()),
470 ("sell_amounts", sell_amount.clone()),
471 ("taker_address", sender.clone()),
472 ("receiver_address", receiver.clone()),
473 ("approval_type", "Standard".into()),
474 ("skip_validation", "true".into()),
475 ("skip_taker_checks", "true".into()),
476 ("gasless", "false".into()),
477 ("expiry_type", "standard".into()),
478 ("fee", "0".into()),
479 ("is_ui", "false".into()),
480 ("source", self.ws_user.clone()),
481 ])
482 .header("accept", "application/json")
483 .header("name", &self.ws_user)
484 .header("source-auth", &self.ws_key)
485 .header("Authorization", &self.ws_key);
486
487 let response = match timeout(remaining_time, request.send()).await {
488 Ok(Ok(resp)) => resp,
489 Ok(Err(e)) => {
490 warn!(
491 "Bebop quote request failed (attempt {}/{}): {}",
492 attempt + 1,
493 MAX_RETRIES,
494 e
495 );
496 last_error = Some(RFQError::ConnectionError(format!(
497 "Failed to send Bebop quote request: {e}"
498 )));
499 if attempt < MAX_RETRIES - 1 {
500 continue;
501 } else {
502 return Err(last_error.unwrap());
503 }
504 }
505 Err(_) => {
506 return Err(RFQError::ConnectionError(format!(
507 "Bebop quote request timed out after {} seconds",
508 self.quote_timeout.as_secs()
509 )));
510 }
511 };
512
513 let quote_response = match response
514 .json::<BebopQuoteResponse>()
515 .await
516 {
517 Ok(resp) => resp,
518 Err(e) => {
519 warn!(
520 "Bebop quote response parsing failed (attempt {}/{}): {}",
521 attempt + 1,
522 MAX_RETRIES,
523 e
524 );
525 last_error = Some(RFQError::ParsingError(format!(
526 "Failed to parse Bebop quote response: {e}"
527 )));
528 if attempt < MAX_RETRIES - 1 {
529 sleep(Duration::from_millis(100)).await;
530 continue;
531 } else {
532 return Err(last_error.unwrap());
533 }
534 }
535 };
536
537 return Self::process_quote_response(quote_response, params);
538 }
539
540 Err(last_error.unwrap_or_else(|| {
541 RFQError::ConnectionError("Bebop quote request failed after retries".to_string())
542 }))
543 }
544}
545
546#[cfg(test)]
547mod tests {
548 use std::{
549 sync::{Arc, Mutex},
550 time::Duration,
551 };
552
553 use dotenv::dotenv;
554 use futures::SinkExt;
555 use tokio::{net::TcpListener, time::timeout};
556 use tokio_tungstenite::accept_async;
557
558 use super::*;
559 use crate::rfq::constants::get_bebop_auth;
560
561 #[tokio::test]
562 #[ignore] async fn test_bebop_websocket_connection() {
564 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
567 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
568
569 dotenv().expect("Missing .env file");
570 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
571
572 let quote_tokens = HashSet::from([
573 Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(), Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), ]);
577
578 let client = BebopClient::new(
579 Chain::Ethereum,
580 HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
581 10.0, auth.user,
583 auth.key,
584 quote_tokens,
585 Duration::from_secs(30),
586 )
587 .unwrap();
588
589 let mut stream = client.stream();
590
591 let result = timeout(Duration::from_secs(10), async {
593 let mut message_count = 0;
594 let max_messages = 5;
595
596 while let Some(result) = stream.next().await {
597 match result {
598 Ok((component_id, msg)) => {
599 println!("Received message with ID: {component_id}");
600
601 assert!(!component_id.is_empty());
602 assert_eq!(component_id, "bebop");
603 assert!(msg.header.timestamp > 0);
604 assert!(!msg.snapshots.states.is_empty());
605
606 let snapshot = &msg.snapshots;
607
608 assert!(!snapshot.states.is_empty());
610
611 println!("Received {} components in this message", snapshot.states.len());
612 for (id, component_with_state) in &snapshot.states {
613 assert_eq!(
614 component_with_state
615 .component
616 .protocol_system,
617 "rfq:bebop"
618 );
619 assert_eq!(
620 component_with_state
621 .component
622 .protocol_type_name,
623 "bebop_pool"
624 );
625 assert_eq!(component_with_state.component.chain, Chain::Ethereum);
626
627 let attributes = &component_with_state.state.attributes;
628
629 assert!(attributes.contains_key("bids"));
631 assert!(attributes.contains_key("asks"));
632 assert!(!attributes["bids"].is_empty());
633 assert!(!attributes["asks"].is_empty());
634
635 if let Some(tvl) = component_with_state.component_tvl {
636 assert!(tvl >= 0.0);
637 println!("Component {id} TVL: ${tvl:.2}");
638 }
639 }
640
641 message_count += 1;
642 if message_count >= max_messages {
643 break;
644 }
645 }
646 Err(e) => {
647 panic!("Stream error: {e}");
648 }
649 }
650 }
651
652 assert!(message_count > 0, "Should have received at least one message");
653 println!("Successfully received {message_count} messages");
654 })
655 .await;
656
657 match result {
658 Ok(_) => println!("Test completed successfully"),
659 Err(_) => panic!("Test timed out - no messages received within 10 seconds"),
660 }
661 }
662
663 #[tokio::test]
664 async fn test_websocket_reconnection() {
665 let listener = TcpListener::bind("127.0.0.1:0")
667 .await
668 .unwrap();
669 let addr = listener.local_addr().unwrap();
670
671 let connection_count = Arc::new(Mutex::new(0u32));
673
674 let connection_count_clone = connection_count.clone();
676
677 tokio::spawn(async move {
678 while let Ok((stream, _)) = listener.accept().await {
679 *connection_count_clone.lock().unwrap() += 1;
680 let count = *connection_count_clone.lock().unwrap();
681 println!("Mock server: Connection #{count} established");
682
683 tokio::spawn(async move {
684 if let Ok(ws_stream) = accept_async(stream).await {
685 let (mut ws_sender, _ws_receiver) = ws_stream.split();
686
687 let weth_addr =
689 hex::decode("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
690 let usdc_addr =
691 hex::decode("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
692
693 let test_price_data = BebopPriceData {
694 base: weth_addr,
695 quote: usdc_addr,
696 last_update_ts: 1752617378,
697 bids: vec![3070.05f32, 0.325717f32],
698 asks: vec![3070.527f32, 0.325717f32],
699 };
700
701 let pricing_update = BebopPricingUpdate { pairs: vec![test_price_data] };
702
703 let test_message = pricing_update.encode_to_vec();
704
705 if count == 1 {
706 println!("Mock server: Connection #1 - sending message then dropping.");
708 let _ = ws_sender
709 .send(Message::Binary(test_message.clone().into()))
710 .await;
711
712 tokio::time::sleep(Duration::from_millis(100)).await;
714 println!("Mock server: Dropping connection #1");
715 let _ = ws_sender.close().await;
716 } else if count == 2 {
717 println!("Mock server: Connection #2 - maintaining stable connection.");
719 let _ = ws_sender
720 .send(Message::Binary(test_message.clone().into()))
721 .await;
722 }
723 }
724 });
725 }
726 });
727
728 tokio::time::sleep(Duration::from_millis(50)).await;
730
731 let mut test_quote_tokens = HashSet::new();
732 test_quote_tokens
733 .insert(Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap());
734
735 let tokens_formatted = vec![
736 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap(),
737 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
738 ];
739
740 let client = BebopClient {
742 chain: Chain::Ethereum,
743 price_ws: format!("ws://127.0.0.1:{}", addr.port()),
744 tokens: tokens_formatted.into_iter().collect(),
745 tvl: 1000.0,
746 ws_user: "test_user".to_string(),
747 ws_key: "test_key".to_string(),
748 quote_tokens: test_quote_tokens,
749 quote_endpoint: "".to_string(),
750 quote_timeout: Duration::from_secs(5),
751 };
752
753 let start_time = std::time::Instant::now();
754 let mut successful_messages = 0;
755 let mut connection_errors = 0;
756 let mut first_message_received = false;
757 let mut second_message_received = false;
758
759 while start_time.elapsed() < Duration::from_secs(5) && successful_messages < 2 {
766 match timeout(Duration::from_millis(1000), client.stream().next()).await {
767 Ok(Some(result)) => match result {
768 Ok((_component_id, _message)) => {
769 successful_messages += 1;
770 println!("Received successful message {successful_messages}");
771
772 if successful_messages == 1 {
773 first_message_received = true;
774 println!("First message received - connection should drop after this.");
775 } else if successful_messages == 2 {
776 second_message_received = true;
777 println!("Second message received after reconnection.");
778 }
779 }
780 Err(e) => {
781 connection_errors += 1;
782 println!("Connection error during reconnection: {e:?}");
783 }
784 },
785 Ok(None) => {
786 panic!("Stream ended unexpectedly");
787 }
788 Err(_) => {
789 println!("Timeout waiting for message (normal during reconnections)");
790 continue;
791 }
792 }
793 }
794
795 let final_connection_count = *connection_count.lock().unwrap();
796
797 assert_eq!(final_connection_count, 2);
801 assert!(first_message_received);
802 assert!(second_message_received);
803 assert_eq!(connection_errors, 0);
804 assert_eq!(successful_messages, 2);
805 }
806
807 #[tokio::test]
808 #[ignore] async fn test_bebop_quote_single_order() {
810 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
811 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
812 dotenv().expect("Missing .env file");
813 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
814
815 let client = BebopClient::new(
816 Chain::Ethereum,
817 HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
818 10.0, auth.user,
820 auth.key,
821 HashSet::new(),
822 Duration::from_secs(30),
823 )
824 .unwrap();
825
826 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
827
828 let params = GetAmountOutParams {
829 amount_in: BigUint::from(1_000000000000000000u64),
830 token_in: token_in.clone(),
831 token_out: token_out.clone(),
832 sender: router.clone(),
833 receiver: router,
834 };
835 let quote = client
836 .request_binding_quote(¶ms)
837 .await
838 .unwrap();
839
840 assert_eq!(quote.base_token, token_in);
841 assert_eq!(quote.quote_token, token_out);
842 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
843
844 assert!(quote.amount_out > BigUint::from(3000000u64));
846
847 assert_eq!(
849 quote
850 .quote_attributes
851 .get("calldata")
852 .unwrap()[..4],
853 Bytes::from_str("0x4dcebcba")
854 .unwrap()
855 .to_vec()
856 );
857 let partial_fill_offset_slice = quote
858 .quote_attributes
859 .get("partial_fill_offset")
860 .unwrap()
861 .as_ref();
862 let mut partial_fill_offset_array = [0u8; 8];
863 partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
864
865 assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 12);
866 }
867
868 #[tokio::test]
869 #[ignore] async fn test_bebop_quote_aggregate_order() {
871 let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
874 let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
875 dotenv().expect("Missing .env file");
876 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
877
878 let client = BebopClient::new(
879 Chain::Ethereum,
880 HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
881 10.0, auth.user,
883 auth.key,
884 HashSet::new(),
885 Duration::from_secs(30),
886 )
887 .unwrap();
888
889 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
890
891 let amount_in = BigUint::from_str("20_000_000_000").unwrap(); let params = GetAmountOutParams {
893 amount_in: amount_in.clone(),
894 token_in: token_in.clone(),
895 token_out: token_out.clone(),
896 sender: router.clone(),
897 receiver: router,
898 };
899 let quote = client
900 .request_binding_quote(¶ms)
901 .await
902 .unwrap();
903
904 assert_eq!(quote.base_token, token_in);
905 assert_eq!(quote.quote_token, token_out);
906 assert_eq!(quote.amount_in, amount_in);
907
908 assert!(quote.amount_out > BigUint::from_str("18000000000000000000000").unwrap()); assert_eq!(
913 quote
914 .quote_attributes
915 .get("calldata")
916 .unwrap()[..4],
917 Bytes::from_str("0xa2f74893")
918 .unwrap()
919 .to_vec()
920 );
921 let partial_fill_offset_slice = quote
922 .quote_attributes
923 .get("partial_fill_offset")
924 .unwrap()
925 .as_ref();
926 let mut partial_fill_offset_array = [0u8; 8];
927 partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
928
929 assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 2);
932 }
933
934 #[test]
935 fn test_process_bebop_quote_response_aggregate_order() {
936 let json =
937 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
938 .unwrap();
939 let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
940 let params = GetAmountOutParams {
941 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
942 token_in: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
943 token_out: Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap(),
944 sender: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
945 receiver: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
946 };
947 let res = BebopClient::process_quote_response(quote_response, ¶ms).unwrap();
948 assert_eq!(res.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
949 assert_eq!(res.amount_in, BigUint::from_str("20000000000").unwrap());
950 assert_eq!(res.base_token, params.token_in);
951 assert_eq!(res.quote_token, params.token_out);
952 }
953
954 #[test]
955 fn test_process_bebop_quote_response_aggregate_order_with_multihop() {
956 let json = std::fs::read_to_string(
957 "src/rfq/protocols/bebop/test_responses/aggregate_order_with_multihop.json",
958 )
959 .unwrap();
960 let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
961 let params = GetAmountOutParams {
962 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
963 token_in: Bytes::from_str("0xDEf1CA1fb7FBcDC777520aa7f396b4E015F497aB").unwrap(),
964 token_out: Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(),
965 sender: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
966 receiver: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
967 };
968 let res = BebopClient::process_quote_response(quote_response, ¶ms).unwrap();
969 assert_eq!(res.amount_out, BigUint::from_str("11186653890").unwrap());
970 assert_eq!(res.amount_in, BigUint::from_str("43067495979235520920162").unwrap());
971 assert_eq!(res.base_token, params.token_in);
972 assert_eq!(res.quote_token, params.token_out);
973 }
974
975 async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
977 use tokio::io::AsyncWriteExt;
978
979 let listener = TcpListener::bind("127.0.0.1:0")
980 .await
981 .unwrap();
982 let addr = listener.local_addr().unwrap();
983
984 let json_response =
985 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
986 .unwrap();
987
988 tokio::spawn(async move {
989 while let Ok((mut stream, _)) = listener.accept().await {
990 let json_response_clone = json_response.clone();
991 tokio::spawn(async move {
992 sleep(Duration::from_millis(delay_ms)).await;
993
994 let response = format!(
995 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
996 json_response_clone.len(),
997 json_response_clone
998 );
999 let _ = stream
1000 .write_all(response.as_bytes())
1001 .await;
1002 let _ = stream.flush().await;
1003 let _ = stream.shutdown().await;
1004 });
1005 }
1006 });
1007
1008 addr
1009 }
1010
1011 fn create_test_bebop_client(quote_endpoint: String, quote_timeout: Duration) -> BebopClient {
1012 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1013 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1014
1015 BebopClient {
1016 chain: Chain::Ethereum,
1017 price_ws: "ws://example.com".to_string(),
1018 quote_endpoint,
1019 tokens: HashSet::from([token_in, token_out]),
1020 tvl: 10.0,
1021 ws_user: "test_user".to_string(),
1022 ws_key: "test_key".to_string(),
1023 quote_tokens: HashSet::new(),
1024 quote_timeout,
1025 }
1026 }
1027
1028 fn create_test_quote_params() -> GetAmountOutParams {
1030 let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1031 let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
1032 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
1033
1034 GetAmountOutParams {
1035 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
1036 token_in,
1037 token_out,
1038 sender: router.clone(),
1039 receiver: router,
1040 }
1041 }
1042
1043 #[tokio::test]
1044 async fn test_bebop_quote_timeout() {
1045 let addr = create_delayed_response_server(500).await;
1046
1047 let client_short_timeout = create_test_bebop_client(
1049 format!("http://127.0.0.1:{}/quote", addr.port()),
1050 Duration::from_millis(200),
1051 );
1052 let params = create_test_quote_params();
1053
1054 let start = std::time::Instant::now();
1055 let result = client_short_timeout
1056 .request_binding_quote(¶ms)
1057 .await;
1058 let elapsed = start.elapsed();
1059
1060 assert!(result.is_err());
1061 let err = result.unwrap_err();
1062 match err {
1063 RFQError::ConnectionError(msg) => {
1064 assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
1065 }
1066 _ => panic!("Expected ConnectionError, got: {:?}", err),
1067 }
1068 assert!(
1069 elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
1070 "Expected timeout around 200ms, got: {:?}",
1071 elapsed
1072 );
1073
1074 let client_long_timeout = create_test_bebop_client(
1078 format!("http://127.0.0.1:{}/quote", addr.port()),
1079 Duration::from_secs(1),
1080 );
1081
1082 let result = client_long_timeout
1083 .request_binding_quote(¶ms)
1084 .await;
1085
1086 assert!(result.is_ok(), "Expected success, got: {:?}", result);
1088 let quote = result.unwrap();
1089
1090 assert_eq!(quote.base_token, params.token_in);
1092 assert_eq!(quote.quote_token, params.token_out);
1093 }
1094
1095 async fn create_retry_server() -> (std::net::SocketAddr, Arc<Mutex<u32>>) {
1098 use std::sync::{Arc, Mutex};
1099
1100 use tokio::io::AsyncWriteExt;
1101
1102 let request_count = Arc::new(Mutex::new(0u32));
1103 let request_count_clone = request_count.clone();
1104
1105 let listener = TcpListener::bind("127.0.0.1:0")
1106 .await
1107 .unwrap();
1108 let addr = listener.local_addr().unwrap();
1109
1110 let json_response =
1111 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
1112 .unwrap();
1113
1114 tokio::spawn(async move {
1115 while let Ok((mut stream, _)) = listener.accept().await {
1116 let count_clone = request_count_clone.clone();
1117 let json_response_clone = json_response.clone();
1118 tokio::spawn(async move {
1119 *count_clone.lock().unwrap() += 1;
1120 let count = *count_clone.lock().unwrap();
1121 println!("Mock server: Received request #{count}");
1122
1123 if count <= 2 {
1124 let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1125 let _ = stream
1126 .write_all(response.as_bytes())
1127 .await;
1128 } else {
1129 let response = format!(
1130 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1131 json_response_clone.len(),
1132 json_response_clone
1133 );
1134 let _ = stream
1135 .write_all(response.as_bytes())
1136 .await;
1137 }
1138 let _ = stream.flush().await;
1139 let _ = stream.shutdown().await;
1140 });
1141 }
1142 });
1143 (addr, request_count)
1144 }
1145
1146 #[tokio::test]
1147 async fn test_bebop_quote_retry_on_bad_response() {
1148 let (addr, request_count) = create_retry_server().await;
1149
1150 let client = create_test_bebop_client(
1151 format!("http://127.0.0.1:{}/quote", addr.port()),
1152 Duration::from_secs(5),
1153 );
1154 let params = create_test_quote_params();
1155 let result = client
1156 .request_binding_quote(¶ms)
1157 .await;
1158
1159 assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1160 let quote = result.unwrap();
1161
1162 assert_eq!(quote.amount_in, BigUint::from_str("20000000000").unwrap());
1164 assert_eq!(quote.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
1165
1166 let final_count = *request_count.lock().unwrap();
1168 assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1169 }
1170
1171 #[test]
1172 fn test_bebop_client_serialize_deserialize_roundtrip() {
1173 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1174 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1175 let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1176
1177 let original = BebopClient {
1178 chain: Chain::Ethereum,
1179 price_ws: "wss://api.bebop.xyz/pricing".to_string(),
1180 quote_endpoint: "https://api.bebop.xyz/quote".to_string(),
1181 tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1182 tvl: 50.5,
1183 ws_user: "secret_user".to_string(),
1184 ws_key: "secret_key".to_string(),
1185 quote_tokens: HashSet::from([quote_token.clone()]),
1186 quote_timeout: Duration::from_millis(5500),
1187 };
1188
1189 let serialized = serde_json::to_string(&original).unwrap();
1190 let deserialized: BebopClient = serde_json::from_str(&serialized).unwrap();
1191
1192 assert_eq!(deserialized.chain, original.chain);
1194 assert_eq!(deserialized.price_ws, original.price_ws);
1195 assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1196 assert_eq!(deserialized.tokens, original.tokens);
1197 assert_eq!(deserialized.tvl, original.tvl);
1198 assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1199 assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1200
1201 assert_eq!(deserialized.ws_user, "");
1203 assert_eq!(deserialized.ws_key, "");
1204 assert_ne!(deserialized.ws_user, original.ws_user);
1205 assert_ne!(deserialized.ws_key, original.ws_key);
1206 }
1207
1208 #[test]
1209 fn test_bebop_client_deserialize_with_credentials() {
1210 let json = r#"{
1213 "chain": "ethereum",
1214 "price_ws": "wss://api.bebop.xyz/pricing",
1215 "quote_endpoint": "https://api.bebop.xyz/quote",
1216 "tokens": [],
1217 "tvl": 10.0,
1218 "ws_user": "provided_user",
1219 "ws_key": "provided_key",
1220 "quote_tokens": [],
1221 "quote_timeout": {"secs": 30, "nanos": 0}
1222 }"#;
1223
1224 let client: BebopClient = serde_json::from_str(json).unwrap();
1225
1226 assert_eq!(client.ws_user, "provided_user");
1228 assert_eq!(client.ws_key, "provided_key");
1229 }
1230}