1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address};
8use async_trait::async_trait;
9use futures::{stream::BoxStream, StreamExt};
10use http::Request;
11use num_bigint::BigUint;
12use prost::Message as ProstMessage;
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use tokio::time::{sleep, timeout, Duration};
16use tokio_tungstenite::{
17 connect_async_with_config,
18 tungstenite::{handshake::client::generate_key, Message},
19};
20use tracing::{error, info, warn};
21use tycho_common::{
22 models::{protocol::GetAmountOutParams, Chain},
23 simulation::indicatively_priced::SignedQuote,
24 Bytes,
25};
26
27use crate::{
28 rfq::{
29 client::RFQClient,
30 errors::RFQError,
31 models::TimestampHeader,
32 protocols::bebop::models::{
33 BebopOrderToSign, BebopPriceData, BebopPricingUpdate, BebopQuoteResponse,
34 },
35 },
36 tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
37 tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
38};
39
40fn bytes_to_address(address: &Bytes) -> Result<Address, RFQError> {
41 if address.len() == 20 {
42 Ok(Address::from_slice(address))
43 } else {
44 Err(RFQError::InvalidInput(format!("Invalid ERC20 token address: {address:?}")))
45 }
46}
47
48fn chain_to_bebop_url(chain: Chain) -> Result<String, RFQError> {
50 let chain_path = match chain {
51 Chain::Ethereum => "ethereum",
52 Chain::Base => "base",
53 _ => return Err(RFQError::FatalError(format!("Unsupported chain: {chain:?}"))),
54 };
55 let url = format!("api.bebop.xyz/pmm/{chain_path}/v3");
56 Ok(url)
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize)]
60pub struct BebopClient {
61 chain: Chain,
62 price_ws: String,
63 quote_endpoint: String,
64 tokens: HashSet<Bytes>,
66 tvl: f64,
68 #[serde(skip_serializing, default)]
70 ws_user: String,
71 #[serde(skip_serializing, default)]
73 ws_key: String,
74 quote_tokens: HashSet<Bytes>,
76 quote_timeout: Duration,
77}
78
79impl BebopClient {
80 pub const PROTOCOL_SYSTEM: &'static str = "rfq:bebop";
81
82 pub fn new(
83 chain: Chain,
84 tokens: HashSet<Bytes>,
85 tvl: f64,
86 ws_user: String,
87 ws_key: String,
88 quote_tokens: HashSet<Bytes>,
89 quote_timeout: Duration,
90 ) -> Result<Self, RFQError> {
91 let url = chain_to_bebop_url(chain)?;
92 Ok(Self {
93 price_ws: "wss://".to_string() + &url + "/pricing?format=protobuf",
94 quote_endpoint: "https://".to_string() + &url + "/quote",
95 tokens,
96 chain,
97 tvl,
98 ws_user,
99 ws_key,
100 quote_tokens,
101 quote_timeout,
102 })
103 }
104
105 fn create_component_with_state(
106 &self,
107 component_id: String,
108 tokens: Vec<tycho_common::Bytes>,
109 price_data: &BebopPriceData,
110 tvl: f64,
111 ) -> ComponentWithState {
112 let protocol_component = ProtocolComponent {
113 id: component_id.clone(),
114 protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
115 protocol_type_name: "bebop_pool".to_string(),
116 chain: self.chain.into(),
117 tokens,
118 contract_ids: vec![], static_attributes: Default::default(),
120 change: Default::default(),
121 creation_tx: Default::default(),
122 created_at: Default::default(),
123 };
124
125 let mut attributes = HashMap::new();
126
127 if !price_data.bids.is_empty() {
131 let bids_pairs: Vec<(f32, f32)> = price_data
132 .bids
133 .chunks_exact(2)
134 .map(|chunk| (chunk[0], chunk[1]))
135 .collect();
136 let bids_json = serde_json::to_string(&bids_pairs).unwrap_or_default();
137 attributes.insert("bids".to_string(), bids_json.as_bytes().to_vec().into());
138 }
139 if !price_data.asks.is_empty() {
140 let asks_pairs: Vec<(f32, f32)> = price_data
141 .asks
142 .chunks_exact(2)
143 .map(|chunk| (chunk[0], chunk[1]))
144 .collect();
145 let asks_json = serde_json::to_string(&asks_pairs).unwrap_or_default();
146 attributes.insert("asks".to_string(), asks_json.as_bytes().to_vec().into());
147 }
148
149 ComponentWithState {
150 state: ResponseProtocolState {
151 component_id: component_id.clone(),
152 attributes,
153 balances: HashMap::new(),
154 },
155 component: protocol_component,
156 component_tvl: Some(tvl),
157 entrypoints: vec![],
158 }
159 }
160
161 fn process_quote_response(
162 quote_response: BebopQuoteResponse,
163 params: &GetAmountOutParams,
164 ) -> Result<SignedQuote, RFQError> {
165 match quote_response {
166 BebopQuoteResponse::Success(quote) => {
167 quote.validate(params)?;
168
169 let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
170 quote_attributes.insert("calldata".into(), quote.tx.data);
171 quote_attributes.insert(
172 "partial_fill_offset".into(),
173 Bytes::from(
174 quote
175 .partial_fill_offset
176 .to_be_bytes()
177 .to_vec(),
178 ),
179 );
180 let signed_quote = match quote.to_sign {
181 BebopOrderToSign::Single(ref single) => SignedQuote {
182 base_token: params.token_in.clone(),
183 quote_token: params.token_out.clone(),
184 amount_in: BigUint::from_str(&single.taker_amount).map_err(|_| {
185 RFQError::ParsingError(format!(
186 "Failed to parse amount in string: {}",
187 single.taker_amount
188 ))
189 })?,
190 amount_out: BigUint::from_str(&single.maker_amount).map_err(|_| {
191 RFQError::ParsingError(format!(
192 "Failed to parse amount out string: {}",
193 single.maker_amount
194 ))
195 })?,
196 quote_attributes,
197 },
198 BebopOrderToSign::Aggregate(aggregate) => {
199 let amount_in: BigUint = aggregate
201 .taker_tokens
202 .iter()
203 .zip(&aggregate.taker_amounts)
204 .flat_map(|(tokens, amounts)| {
205 tokens
206 .iter()
207 .zip(amounts)
208 .filter_map(|(token, amount)| {
209 if token == ¶ms.token_in {
210 BigUint::from_str(amount).ok()
211 } else {
212 None
213 }
214 })
215 })
216 .sum();
217
218 let amount_out: BigUint = aggregate
220 .maker_tokens
221 .iter()
222 .zip(&aggregate.maker_amounts)
223 .flat_map(|(tokens, amounts)| {
224 tokens
225 .iter()
226 .zip(amounts)
227 .filter_map(|(token, amount)| {
228 if token == ¶ms.token_out {
229 BigUint::from_str(amount).ok()
230 } else {
231 None
232 }
233 })
234 })
235 .sum();
236
237 SignedQuote {
238 base_token: params.token_in.clone(),
239 quote_token: params.token_out.clone(),
240 amount_in,
241 amount_out,
242 quote_attributes,
243 }
244 }
245 };
246
247 Ok(signed_quote)
248 }
249 BebopQuoteResponse::Error(err) => Err(RFQError::FatalError(format!(
250 "Bebop API error: code {} - {} (requestId: {})",
251 err.error.error_code, err.error.message, err.error.request_id
252 ))),
253 }
254 }
255}
256
257#[async_trait]
258impl RFQClient for BebopClient {
259 fn stream(
260 &self,
261 ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
262 let tokens = self.tokens.clone();
263 let url = self.price_ws.clone();
264 let tvl_threshold = self.tvl;
265 let name = self.ws_user.clone();
266 let authorization = self.ws_key.clone();
267 let client = self.clone();
268
269 Box::pin(async_stream::stream! {
270 let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
271 let mut reconnect_attempts = 0;
272 const MAX_RECONNECT_ATTEMPTS: u32 = 10;
273
274 loop {
275 let request = Request::builder()
276 .method("GET")
277 .uri(&url)
278 .header("Host", "api.bebop.xyz")
279 .header("Upgrade", "websocket")
280 .header("Connection", "Upgrade")
281 .header("Sec-WebSocket-Key", generate_key())
282 .header("Sec-WebSocket-Version", "13")
283 .header("name", &name)
284 .header("Authorization", &authorization)
285 .body(())
286 .map_err(|_| RFQError::FatalError("Failed to build request".into()))?;
287
288 let (ws_stream, _) = match connect_async_with_config(request, None, false).await {
290 Ok(connection) => {
291 info!("Successfully connected to Bebop WebSocket");
292 reconnect_attempts = 0; connection
294 },
295 Err(e) => {
296 reconnect_attempts += 1;
297 error!("Failed to connect to Bebop WebSocket (attempt {}): {}", reconnect_attempts, e);
298
299 if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
300 yield Err(RFQError::ConnectionError(format!("Failed to connect after {MAX_RECONNECT_ATTEMPTS} attempts: {e}")));
301 return;
302 }
303
304 let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
305 info!("Retrying connection in {} seconds...", backoff_duration.as_secs());
306 sleep(backoff_duration).await;
307 continue;
308 }
309 };
310
311 let (_, mut ws_receiver) = ws_stream.split();
312
313 while let Some(msg) = ws_receiver.next().await {
315 match msg {
316 Ok(Message::Binary(data)) => {
317 match BebopPricingUpdate::decode(&data[..]) {
318 Ok(protobuf_update) => {
319 let mut new_components = HashMap::new();
320
321 for price_data in &protobuf_update.pairs {
323 let base_bytes = Bytes::from(price_data.base.clone());
324 let quote_bytes = Bytes::from(price_data.quote.clone());
325 if tokens.contains(&base_bytes) && tokens.contains("e_bytes) {
326 let pair_tokens = vec![
327 base_bytes.clone(), quote_bytes.clone()
328 ];
329
330 let mut quote_price_data: Option<&BebopPriceData> = None;
331 if !client.quote_tokens.contains("e_bytes) {
334 for approved_quote_token in &client.quote_tokens {
335 if let Some(quote_data) = protobuf_update.pairs.iter()
338 .find(|p| {
339 (p.base == quote_bytes.as_ref() && p.quote == approved_quote_token.as_ref()) ||
340 (p.quote == quote_bytes.as_ref() && p.base == approved_quote_token.as_ref())
341 }) {
342 quote_price_data = Some(quote_data);
343 break;
344 }
345 }
346
347 if quote_price_data.is_none() {
350 warn!("Quote token {} does not have price levels in approved quote token. Skipping.", hex::encode("e_bytes));
351 continue;
352 }
353 }
354
355 let tvl = price_data.calculate_tvl(quote_price_data);
356 if tvl < tvl_threshold {
357 continue;
358 }
359
360 let pair_str = format!("bebop_{}/{}", hex::encode(&base_bytes), hex::encode("e_bytes));
361 let component_id = format!("{}", keccak256(pair_str.as_bytes()));
362 let component_with_state = client.create_component_with_state(
363 component_id.clone(),
364 pair_tokens,
365 price_data,
366 tvl
367 );
368 new_components.insert(component_id, component_with_state);
369 }
370 }
371
372 let removed_components: HashMap<String, ProtocolComponent> = current_components
376 .iter()
377 .filter(|&(id, _)| !new_components.contains_key(id))
378 .map(|(k, v)| (k.clone(), v.component.clone()))
379 .collect();
380
381 current_components = new_components.clone();
383
384 let snapshot = Snapshot {
385 states: new_components,
386 vm_storage: HashMap::new(),
387 };
388 let timestamp = SystemTime::now().duration_since(
389 SystemTime::UNIX_EPOCH
390 ).map_err(
391 |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
392 )?.as_secs();
393
394 let msg = StateSyncMessage::<TimestampHeader> {
395 header: TimestampHeader { timestamp },
396 snapshots: snapshot,
397 deltas: None, removed_components,
399 };
400
401 yield Ok(("bebop".to_string(), msg));
403 },
404 Err(e) => {
405 error!("Failed to parse protobuf message: {}", e);
406 break;
407 }
408 }
409 }
410 Ok(Message::Close(_)) => {
411 info!("WebSocket connection closed by server");
412 break;
413 }
414 Err(e) => {
415 error!("WebSocket error: {}", e);
416 break;
417 }
418 _ => {} }
420 }
421
422 reconnect_attempts += 1;
424 if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
425 yield Err(RFQError::ConnectionError(format!("Connection failed after {MAX_RECONNECT_ATTEMPTS} attempts")));
426 return;
427 }
428
429 let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
430 info!("Reconnecting in {} seconds (attempt {})...", backoff_duration.as_secs(), reconnect_attempts);
431 sleep(backoff_duration).await;
432 }
434 })
435 }
436
437 async fn request_binding_quote(
438 &self,
439 params: &GetAmountOutParams,
440 ) -> Result<SignedQuote, RFQError> {
441 let sell_token = bytes_to_address(¶ms.token_in)?.to_string();
442 let buy_token = bytes_to_address(¶ms.token_out)?.to_string();
443 let sell_amount = params.amount_in.to_string();
444 let sender = bytes_to_address(¶ms.sender)?.to_string();
445 let receiver = bytes_to_address(¶ms.receiver)?.to_string();
446
447 let url = self.quote_endpoint.clone();
448
449 let client = Client::new();
450
451 let start_time = std::time::Instant::now();
452 const MAX_RETRIES: u32 = 3;
453 let mut last_error = None;
454
455 for attempt in 0..MAX_RETRIES {
456 let elapsed = start_time.elapsed();
458 if elapsed >= self.quote_timeout {
459 return Err(last_error.unwrap_or_else(|| {
460 RFQError::ConnectionError(format!(
461 "Bebop quote request timed out after {} seconds",
462 self.quote_timeout.as_secs()
463 ))
464 }));
465 }
466
467 let remaining_time = self.quote_timeout - elapsed;
468
469 let request = client
470 .get(&url)
471 .query(&[
472 ("sell_tokens", sell_token.clone()),
473 ("buy_tokens", buy_token.clone()),
474 ("sell_amounts", sell_amount.clone()),
475 ("taker_address", sender.clone()),
476 ("receiver_address", receiver.clone()),
477 ("approval_type", "Standard".into()),
478 ("skip_validation", "true".into()),
479 ("skip_taker_checks", "true".into()),
480 ("gasless", "false".into()),
481 ("expiry_type", "standard".into()),
482 ("fee", "0".into()),
483 ("is_ui", "false".into()),
484 ("source", self.ws_user.clone()),
485 ])
486 .header("accept", "application/json")
487 .header("name", &self.ws_user)
488 .header("source-auth", &self.ws_key)
489 .header("Authorization", &self.ws_key);
490
491 let response = match timeout(remaining_time, request.send()).await {
492 Ok(Ok(resp)) => resp,
493 Ok(Err(e)) => {
494 warn!(
495 "Bebop quote request failed (attempt {}/{}): {}",
496 attempt + 1,
497 MAX_RETRIES,
498 e
499 );
500 last_error = Some(RFQError::ConnectionError(format!(
501 "Failed to send Bebop quote request: {e}"
502 )));
503 if attempt < MAX_RETRIES - 1 {
504 continue;
505 } else {
506 return Err(last_error.unwrap());
507 }
508 }
509 Err(_) => {
510 return Err(RFQError::ConnectionError(format!(
511 "Bebop quote request timed out after {} seconds",
512 self.quote_timeout.as_secs()
513 )));
514 }
515 };
516
517 let quote_response = match response
518 .json::<BebopQuoteResponse>()
519 .await
520 {
521 Ok(resp) => resp,
522 Err(e) => {
523 warn!(
524 "Bebop quote response parsing failed (attempt {}/{}): {}",
525 attempt + 1,
526 MAX_RETRIES,
527 e
528 );
529 last_error = Some(RFQError::ParsingError(format!(
530 "Failed to parse Bebop quote response: {e}"
531 )));
532 if attempt < MAX_RETRIES - 1 {
533 sleep(Duration::from_millis(100)).await;
534 continue;
535 } else {
536 return Err(last_error.unwrap());
537 }
538 }
539 };
540
541 return Self::process_quote_response(quote_response, params);
542 }
543
544 Err(last_error.unwrap_or_else(|| {
545 RFQError::ConnectionError("Bebop quote request failed after retries".to_string())
546 }))
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use std::{
553 sync::{Arc, Mutex},
554 time::Duration,
555 };
556
557 use dotenv::dotenv;
558 use futures::SinkExt;
559 use tokio::{net::TcpListener, time::timeout};
560 use tokio_tungstenite::accept_async;
561
562 use super::*;
563 use crate::rfq::constants::get_bebop_auth;
564
565 #[tokio::test]
566 #[ignore] async fn test_bebop_websocket_connection() {
568 let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
571 let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
572
573 dotenv().expect("Missing .env file");
574 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
575
576 let quote_tokens = HashSet::from([
577 Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(), Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), ]);
581
582 let client = BebopClient::new(
583 Chain::Ethereum,
584 HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
585 10.0, auth.user,
587 auth.key,
588 quote_tokens,
589 Duration::from_secs(30),
590 )
591 .unwrap();
592
593 let mut stream = client.stream();
594
595 let result = timeout(Duration::from_secs(10), async {
597 let mut message_count = 0;
598 let max_messages = 5;
599
600 while let Some(result) = stream.next().await {
601 match result {
602 Ok((component_id, msg)) => {
603 println!("Received message with ID: {component_id}");
604
605 assert!(!component_id.is_empty());
606 assert_eq!(component_id, "bebop");
607 assert!(msg.header.timestamp > 0);
608 assert!(!msg.snapshots.states.is_empty());
609
610 let snapshot = &msg.snapshots;
611
612 assert!(!snapshot.states.is_empty());
614
615 println!("Received {} components in this message", snapshot.states.len());
616 for (id, component_with_state) in &snapshot.states {
617 assert_eq!(
618 component_with_state
619 .component
620 .protocol_system,
621 "rfq:bebop"
622 );
623 assert_eq!(
624 component_with_state
625 .component
626 .protocol_type_name,
627 "bebop_pool"
628 );
629 assert_eq!(
630 component_with_state.component.chain,
631 Chain::Ethereum.into()
632 );
633
634 let attributes = &component_with_state.state.attributes;
635
636 assert!(attributes.contains_key("bids"));
638 assert!(attributes.contains_key("asks"));
639 assert!(!attributes["bids"].is_empty());
640 assert!(!attributes["asks"].is_empty());
641
642 if let Some(tvl) = component_with_state.component_tvl {
643 assert!(tvl >= 0.0);
644 println!("Component {id} TVL: ${tvl:.2}");
645 }
646 }
647
648 message_count += 1;
649 if message_count >= max_messages {
650 break;
651 }
652 }
653 Err(e) => {
654 panic!("Stream error: {e}");
655 }
656 }
657 }
658
659 assert!(message_count > 0, "Should have received at least one message");
660 println!("Successfully received {message_count} messages");
661 })
662 .await;
663
664 match result {
665 Ok(_) => println!("Test completed successfully"),
666 Err(_) => panic!("Test timed out - no messages received within 10 seconds"),
667 }
668 }
669
670 #[tokio::test]
671 async fn test_websocket_reconnection() {
672 let listener = TcpListener::bind("127.0.0.1:0")
674 .await
675 .unwrap();
676 let addr = listener.local_addr().unwrap();
677
678 let connection_count = Arc::new(Mutex::new(0u32));
680
681 let connection_count_clone = connection_count.clone();
683
684 tokio::spawn(async move {
685 while let Ok((stream, _)) = listener.accept().await {
686 *connection_count_clone.lock().unwrap() += 1;
687 let count = *connection_count_clone.lock().unwrap();
688 println!("Mock server: Connection #{count} established");
689
690 tokio::spawn(async move {
691 if let Ok(ws_stream) = accept_async(stream).await {
692 let (mut ws_sender, _ws_receiver) = ws_stream.split();
693
694 let weth_addr =
696 hex::decode("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
697 let usdc_addr =
698 hex::decode("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
699
700 let test_price_data = BebopPriceData {
701 base: weth_addr,
702 quote: usdc_addr,
703 last_update_ts: 1752617378,
704 bids: vec![3070.05f32, 0.325717f32],
705 asks: vec![3070.527f32, 0.325717f32],
706 };
707
708 let pricing_update = BebopPricingUpdate { pairs: vec![test_price_data] };
709
710 let test_message = pricing_update.encode_to_vec();
711
712 if count == 1 {
713 println!("Mock server: Connection #1 - sending message then dropping.");
715 let _ = ws_sender
716 .send(Message::Binary(test_message.clone().into()))
717 .await;
718
719 tokio::time::sleep(Duration::from_millis(100)).await;
721 println!("Mock server: Dropping connection #1");
722 let _ = ws_sender.close().await;
723 } else if count == 2 {
724 println!("Mock server: Connection #2 - maintaining stable connection.");
726 let _ = ws_sender
727 .send(Message::Binary(test_message.clone().into()))
728 .await;
729 }
730 }
731 });
732 }
733 });
734
735 tokio::time::sleep(Duration::from_millis(50)).await;
737
738 let mut test_quote_tokens = HashSet::new();
739 test_quote_tokens
740 .insert(Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap());
741
742 let tokens_formatted = vec![
743 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap(),
744 Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
745 ];
746
747 let client = BebopClient {
749 chain: Chain::Ethereum,
750 price_ws: format!("ws://127.0.0.1:{}", addr.port()),
751 tokens: tokens_formatted.into_iter().collect(),
752 tvl: 1000.0,
753 ws_user: "test_user".to_string(),
754 ws_key: "test_key".to_string(),
755 quote_tokens: test_quote_tokens,
756 quote_endpoint: "".to_string(),
757 quote_timeout: Duration::from_secs(5),
758 };
759
760 let start_time = std::time::Instant::now();
761 let mut successful_messages = 0;
762 let mut connection_errors = 0;
763 let mut first_message_received = false;
764 let mut second_message_received = false;
765
766 while start_time.elapsed() < Duration::from_secs(5) && successful_messages < 2 {
773 match timeout(Duration::from_millis(1000), client.stream().next()).await {
774 Ok(Some(result)) => match result {
775 Ok((_component_id, _message)) => {
776 successful_messages += 1;
777 println!("Received successful message {successful_messages}");
778
779 if successful_messages == 1 {
780 first_message_received = true;
781 println!("First message received - connection should drop after this.");
782 } else if successful_messages == 2 {
783 second_message_received = true;
784 println!("Second message received after reconnection.");
785 }
786 }
787 Err(e) => {
788 connection_errors += 1;
789 println!("Connection error during reconnection: {e:?}");
790 }
791 },
792 Ok(None) => {
793 panic!("Stream ended unexpectedly");
794 }
795 Err(_) => {
796 println!("Timeout waiting for message (normal during reconnections)");
797 continue;
798 }
799 }
800 }
801
802 let final_connection_count = *connection_count.lock().unwrap();
803
804 assert_eq!(final_connection_count, 2);
808 assert!(first_message_received);
809 assert!(second_message_received);
810 assert_eq!(connection_errors, 0);
811 assert_eq!(successful_messages, 2);
812 }
813
814 #[tokio::test]
815 #[ignore] async fn test_bebop_quote_single_order() {
817 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
818 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
819 dotenv().expect("Missing .env file");
820 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
821
822 let client = BebopClient::new(
823 Chain::Ethereum,
824 HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
825 10.0, auth.user,
827 auth.key,
828 HashSet::new(),
829 Duration::from_secs(30),
830 )
831 .unwrap();
832
833 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
834
835 let params = GetAmountOutParams {
836 amount_in: BigUint::from(1_000000000000000000u64),
837 token_in: token_in.clone(),
838 token_out: token_out.clone(),
839 sender: router.clone(),
840 receiver: router,
841 };
842 let quote = client
843 .request_binding_quote(¶ms)
844 .await
845 .unwrap();
846
847 assert_eq!(quote.base_token, token_in);
848 assert_eq!(quote.quote_token, token_out);
849 assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
850
851 assert!(quote.amount_out > BigUint::from(3000000u64));
853
854 assert_eq!(
856 quote
857 .quote_attributes
858 .get("calldata")
859 .unwrap()[..4],
860 Bytes::from_str("0x4dcebcba")
861 .unwrap()
862 .to_vec()
863 );
864 let partial_fill_offset_slice = quote
865 .quote_attributes
866 .get("partial_fill_offset")
867 .unwrap()
868 .as_ref();
869 let mut partial_fill_offset_array = [0u8; 8];
870 partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
871
872 assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 12);
873 }
874
875 #[tokio::test]
876 #[ignore] async fn test_bebop_quote_aggregate_order() {
878 let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
881 let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
882 dotenv().expect("Missing .env file");
883 let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
884
885 let client = BebopClient::new(
886 Chain::Ethereum,
887 HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
888 10.0, auth.user,
890 auth.key,
891 HashSet::new(),
892 Duration::from_secs(30),
893 )
894 .unwrap();
895
896 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
897
898 let amount_in = BigUint::from_str("20_000_000_000").unwrap(); let params = GetAmountOutParams {
900 amount_in: amount_in.clone(),
901 token_in: token_in.clone(),
902 token_out: token_out.clone(),
903 sender: router.clone(),
904 receiver: router,
905 };
906 let quote = client
907 .request_binding_quote(¶ms)
908 .await
909 .unwrap();
910
911 assert_eq!(quote.base_token, token_in);
912 assert_eq!(quote.quote_token, token_out);
913 assert_eq!(quote.amount_in, amount_in);
914
915 assert!(quote.amount_out > BigUint::from_str("18000000000000000000000").unwrap()); assert_eq!(
920 quote
921 .quote_attributes
922 .get("calldata")
923 .unwrap()[..4],
924 Bytes::from_str("0xa2f74893")
925 .unwrap()
926 .to_vec()
927 );
928 let partial_fill_offset_slice = quote
929 .quote_attributes
930 .get("partial_fill_offset")
931 .unwrap()
932 .as_ref();
933 let mut partial_fill_offset_array = [0u8; 8];
934 partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
935
936 assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 2);
939 }
940
941 #[test]
942 fn test_process_bebop_quote_response_aggregate_order() {
943 let json =
944 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
945 .unwrap();
946 let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
947 let params = GetAmountOutParams {
948 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
949 token_in: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
950 token_out: Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap(),
951 sender: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
952 receiver: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
953 };
954 let res = BebopClient::process_quote_response(quote_response, ¶ms).unwrap();
955 assert_eq!(res.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
956 assert_eq!(res.amount_in, BigUint::from_str("20000000000").unwrap());
957 assert_eq!(res.base_token, params.token_in);
958 assert_eq!(res.quote_token, params.token_out);
959 }
960
961 #[test]
962 fn test_process_bebop_quote_response_aggregate_order_with_multihop() {
963 let json = std::fs::read_to_string(
964 "src/rfq/protocols/bebop/test_responses/aggregate_order_with_multihop.json",
965 )
966 .unwrap();
967 let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
968 let params = GetAmountOutParams {
969 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
970 token_in: Bytes::from_str("0xDEf1CA1fb7FBcDC777520aa7f396b4E015F497aB").unwrap(),
971 token_out: Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(),
972 sender: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
973 receiver: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
974 };
975 let res = BebopClient::process_quote_response(quote_response, ¶ms).unwrap();
976 assert_eq!(res.amount_out, BigUint::from_str("11186653890").unwrap());
977 assert_eq!(res.amount_in, BigUint::from_str("43067495979235520920162").unwrap());
978 assert_eq!(res.base_token, params.token_in);
979 assert_eq!(res.quote_token, params.token_out);
980 }
981
982 async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
984 use tokio::io::AsyncWriteExt;
985
986 let listener = TcpListener::bind("127.0.0.1:0")
987 .await
988 .unwrap();
989 let addr = listener.local_addr().unwrap();
990
991 let json_response =
992 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
993 .unwrap();
994
995 tokio::spawn(async move {
996 while let Ok((mut stream, _)) = listener.accept().await {
997 let json_response_clone = json_response.clone();
998 tokio::spawn(async move {
999 sleep(Duration::from_millis(delay_ms)).await;
1000
1001 let response = format!(
1002 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1003 json_response_clone.len(),
1004 json_response_clone
1005 );
1006 let _ = stream
1007 .write_all(response.as_bytes())
1008 .await;
1009 let _ = stream.flush().await;
1010 let _ = stream.shutdown().await;
1011 });
1012 }
1013 });
1014
1015 addr
1016 }
1017
1018 fn create_test_bebop_client(quote_endpoint: String, quote_timeout: Duration) -> BebopClient {
1019 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1020 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1021
1022 BebopClient {
1023 chain: Chain::Ethereum,
1024 price_ws: "ws://example.com".to_string(),
1025 quote_endpoint,
1026 tokens: HashSet::from([token_in, token_out]),
1027 tvl: 10.0,
1028 ws_user: "test_user".to_string(),
1029 ws_key: "test_key".to_string(),
1030 quote_tokens: HashSet::new(),
1031 quote_timeout,
1032 }
1033 }
1034
1035 fn create_test_quote_params() -> GetAmountOutParams {
1037 let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1038 let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
1039 let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
1040
1041 GetAmountOutParams {
1042 amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
1043 token_in,
1044 token_out,
1045 sender: router.clone(),
1046 receiver: router,
1047 }
1048 }
1049
1050 #[tokio::test]
1051 async fn test_bebop_quote_timeout() {
1052 let addr = create_delayed_response_server(500).await;
1053
1054 let client_short_timeout = create_test_bebop_client(
1056 format!("http://127.0.0.1:{}/quote", addr.port()),
1057 Duration::from_millis(200),
1058 );
1059 let params = create_test_quote_params();
1060
1061 let start = std::time::Instant::now();
1062 let result = client_short_timeout
1063 .request_binding_quote(¶ms)
1064 .await;
1065 let elapsed = start.elapsed();
1066
1067 assert!(result.is_err());
1068 let err = result.unwrap_err();
1069 match err {
1070 RFQError::ConnectionError(msg) => {
1071 assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
1072 }
1073 _ => panic!("Expected ConnectionError, got: {:?}", err),
1074 }
1075 assert!(
1076 elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
1077 "Expected timeout around 200ms, got: {:?}",
1078 elapsed
1079 );
1080
1081 let client_long_timeout = create_test_bebop_client(
1085 format!("http://127.0.0.1:{}/quote", addr.port()),
1086 Duration::from_secs(1),
1087 );
1088
1089 let result = client_long_timeout
1090 .request_binding_quote(¶ms)
1091 .await;
1092
1093 assert!(result.is_ok(), "Expected success, got: {:?}", result);
1095 let quote = result.unwrap();
1096
1097 assert_eq!(quote.base_token, params.token_in);
1099 assert_eq!(quote.quote_token, params.token_out);
1100 }
1101
1102 async fn create_retry_server() -> (std::net::SocketAddr, Arc<Mutex<u32>>) {
1105 use std::sync::{Arc, Mutex};
1106
1107 use tokio::io::AsyncWriteExt;
1108
1109 let request_count = Arc::new(Mutex::new(0u32));
1110 let request_count_clone = request_count.clone();
1111
1112 let listener = TcpListener::bind("127.0.0.1:0")
1113 .await
1114 .unwrap();
1115 let addr = listener.local_addr().unwrap();
1116
1117 let json_response =
1118 std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
1119 .unwrap();
1120
1121 tokio::spawn(async move {
1122 while let Ok((mut stream, _)) = listener.accept().await {
1123 let count_clone = request_count_clone.clone();
1124 let json_response_clone = json_response.clone();
1125 tokio::spawn(async move {
1126 *count_clone.lock().unwrap() += 1;
1127 let count = *count_clone.lock().unwrap();
1128 println!("Mock server: Received request #{count}");
1129
1130 if count <= 2 {
1131 let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1132 let _ = stream
1133 .write_all(response.as_bytes())
1134 .await;
1135 } else {
1136 let response = format!(
1137 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1138 json_response_clone.len(),
1139 json_response_clone
1140 );
1141 let _ = stream
1142 .write_all(response.as_bytes())
1143 .await;
1144 }
1145 let _ = stream.flush().await;
1146 let _ = stream.shutdown().await;
1147 });
1148 }
1149 });
1150 (addr, request_count)
1151 }
1152
1153 #[tokio::test]
1154 async fn test_bebop_quote_retry_on_bad_response() {
1155 let (addr, request_count) = create_retry_server().await;
1156
1157 let client = create_test_bebop_client(
1158 format!("http://127.0.0.1:{}/quote", addr.port()),
1159 Duration::from_secs(5),
1160 );
1161 let params = create_test_quote_params();
1162 let result = client
1163 .request_binding_quote(¶ms)
1164 .await;
1165
1166 assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1167 let quote = result.unwrap();
1168
1169 assert_eq!(quote.amount_in, BigUint::from_str("20000000000").unwrap());
1171 assert_eq!(quote.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
1172
1173 let final_count = *request_count.lock().unwrap();
1175 assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1176 }
1177
1178 #[test]
1179 fn test_bebop_client_serialize_deserialize_roundtrip() {
1180 let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1181 let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1182 let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1183
1184 let original = BebopClient {
1185 chain: Chain::Ethereum,
1186 price_ws: "wss://api.bebop.xyz/pricing".to_string(),
1187 quote_endpoint: "https://api.bebop.xyz/quote".to_string(),
1188 tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1189 tvl: 50.5,
1190 ws_user: "secret_user".to_string(),
1191 ws_key: "secret_key".to_string(),
1192 quote_tokens: HashSet::from([quote_token.clone()]),
1193 quote_timeout: Duration::from_millis(5500),
1194 };
1195
1196 let serialized = serde_json::to_string(&original).unwrap();
1197 let deserialized: BebopClient = serde_json::from_str(&serialized).unwrap();
1198
1199 assert_eq!(deserialized.chain, original.chain);
1201 assert_eq!(deserialized.price_ws, original.price_ws);
1202 assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1203 assert_eq!(deserialized.tokens, original.tokens);
1204 assert_eq!(deserialized.tvl, original.tvl);
1205 assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1206 assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1207
1208 assert_eq!(deserialized.ws_user, "");
1210 assert_eq!(deserialized.ws_key, "");
1211 assert_ne!(deserialized.ws_user, original.ws_user);
1212 assert_ne!(deserialized.ws_key, original.ws_key);
1213 }
1214
1215 #[test]
1216 fn test_bebop_client_deserialize_with_credentials() {
1217 let json = r#"{
1220 "chain": "ethereum",
1221 "price_ws": "wss://api.bebop.xyz/pricing",
1222 "quote_endpoint": "https://api.bebop.xyz/quote",
1223 "tokens": [],
1224 "tvl": 10.0,
1225 "ws_user": "provided_user",
1226 "ws_key": "provided_key",
1227 "quote_tokens": [],
1228 "quote_timeout": {"secs": 30, "nanos": 0}
1229 }"#;
1230
1231 let client: BebopClient = serde_json::from_str(json).unwrap();
1232
1233 assert_eq!(client.ws_user, "provided_user");
1235 assert_eq!(client.ws_key, "provided_key");
1236 }
1237}