tycho_simulation/rfq/protocols/bebop/
client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address};
8use async_trait::async_trait;
9use futures::{stream::BoxStream, StreamExt};
10use http::Request;
11use num_bigint::BigUint;
12use prost::Message as ProstMessage;
13use reqwest::Client;
14use tokio::time::{sleep, timeout, Duration};
15use tokio_tungstenite::{
16    connect_async_with_config,
17    tungstenite::{handshake::client::generate_key, Message},
18};
19use tracing::{error, info, warn};
20use tycho_common::{
21    models::{protocol::GetAmountOutParams, Chain},
22    simulation::indicatively_priced::SignedQuote,
23    Bytes,
24};
25
26use crate::{
27    rfq::{
28        client::RFQClient,
29        errors::RFQError,
30        models::TimestampHeader,
31        protocols::bebop::models::{
32            BebopOrderToSign, BebopPriceData, BebopPricingUpdate, BebopQuoteResponse,
33        },
34    },
35    tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
36    tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
37};
38
39fn bytes_to_address(address: &Bytes) -> Result<Address, RFQError> {
40    if address.len() == 20 {
41        Ok(Address::from_slice(address))
42    } else {
43        Err(RFQError::InvalidInput(format!("Invalid ERC20 token address: {address:?}")))
44    }
45}
46
47/// Maps a Chain to its corresponding Bebop WebSocket URL
48fn chain_to_bebop_url(chain: Chain) -> Result<String, RFQError> {
49    let chain_path = match chain {
50        Chain::Ethereum => "ethereum",
51        Chain::Base => "base",
52        _ => return Err(RFQError::FatalError(format!("Unsupported chain: {chain:?}"))),
53    };
54    let url = format!("api.bebop.xyz/pmm/{chain_path}/v3");
55    Ok(url)
56}
57
58#[derive(Clone, Debug)]
59pub struct BebopClient {
60    chain: Chain,
61    price_ws: String,
62    quote_endpoint: String,
63    // Tokens that we want prices for
64    tokens: HashSet<Bytes>,
65    // Min tvl value in the quote token.
66    tvl: f64,
67    // name header for authentication
68    ws_user: String,
69    // key header for authentication
70    ws_key: String,
71    // quote tokens to normalize to for TVL purposes. Should have the same prices.
72    quote_tokens: HashSet<Bytes>,
73    quote_timeout: Duration,
74}
75
76impl BebopClient {
77    pub const PROTOCOL_SYSTEM: &'static str = "rfq:bebop";
78
79    pub fn new(
80        chain: Chain,
81        tokens: HashSet<Bytes>,
82        tvl: f64,
83        ws_user: String,
84        ws_key: String,
85        quote_tokens: HashSet<Bytes>,
86        quote_timeout: Duration,
87    ) -> Result<Self, RFQError> {
88        let url = chain_to_bebop_url(chain)?;
89        Ok(Self {
90            price_ws: "wss://".to_string() + &url + "/pricing?format=protobuf",
91            quote_endpoint: "https://".to_string() + &url + "/quote",
92            tokens,
93            chain,
94            tvl,
95            ws_user,
96            ws_key,
97            quote_tokens,
98            quote_timeout,
99        })
100    }
101
102    fn create_component_with_state(
103        &self,
104        component_id: String,
105        tokens: Vec<tycho_common::Bytes>,
106        price_data: &BebopPriceData,
107        tvl: f64,
108    ) -> ComponentWithState {
109        let protocol_component = ProtocolComponent {
110            id: component_id.clone(),
111            protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
112            protocol_type_name: "bebop_pool".to_string(),
113            chain: self.chain.into(),
114            tokens,
115            contract_ids: vec![], // empty for RFQ
116            static_attributes: Default::default(),
117            change: Default::default(),
118            creation_tx: Default::default(),
119            created_at: Default::default(),
120        };
121
122        let mut attributes = HashMap::new();
123
124        // Store all bids and asks as JSON strings, since we cannot store arrays
125        // Convert flat arrays [price1, size1, price2, size2, ...] to pairs [(price1, size1),
126        // (price2, size2), ...]
127        if !price_data.bids.is_empty() {
128            let bids_pairs: Vec<(f32, f32)> = price_data
129                .bids
130                .chunks_exact(2)
131                .map(|chunk| (chunk[0], chunk[1]))
132                .collect();
133            let bids_json = serde_json::to_string(&bids_pairs).unwrap_or_default();
134            attributes.insert("bids".to_string(), bids_json.as_bytes().to_vec().into());
135        }
136        if !price_data.asks.is_empty() {
137            let asks_pairs: Vec<(f32, f32)> = price_data
138                .asks
139                .chunks_exact(2)
140                .map(|chunk| (chunk[0], chunk[1]))
141                .collect();
142            let asks_json = serde_json::to_string(&asks_pairs).unwrap_or_default();
143            attributes.insert("asks".to_string(), asks_json.as_bytes().to_vec().into());
144        }
145
146        ComponentWithState {
147            state: ResponseProtocolState {
148                component_id: component_id.clone(),
149                attributes,
150                balances: HashMap::new(),
151            },
152            component: protocol_component,
153            component_tvl: Some(tvl),
154            entrypoints: vec![],
155        }
156    }
157
158    fn process_quote_response(
159        quote_response: BebopQuoteResponse,
160        params: &GetAmountOutParams,
161    ) -> Result<SignedQuote, RFQError> {
162        match quote_response {
163            BebopQuoteResponse::Success(quote) => {
164                quote.validate(params)?;
165
166                let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
167                quote_attributes.insert("calldata".into(), quote.tx.data);
168                quote_attributes.insert(
169                    "partial_fill_offset".into(),
170                    Bytes::from(
171                        quote
172                            .partial_fill_offset
173                            .to_be_bytes()
174                            .to_vec(),
175                    ),
176                );
177                let signed_quote = match quote.to_sign {
178                    BebopOrderToSign::Single(ref single) => SignedQuote {
179                        base_token: params.token_in.clone(),
180                        quote_token: params.token_out.clone(),
181                        amount_in: BigUint::from_str(&single.taker_amount).map_err(|_| {
182                            RFQError::ParsingError(format!(
183                                "Failed to parse amount in string: {}",
184                                single.taker_amount
185                            ))
186                        })?,
187                        amount_out: BigUint::from_str(&single.maker_amount).map_err(|_| {
188                            RFQError::ParsingError(format!(
189                                "Failed to parse amount out string: {}",
190                                single.maker_amount
191                            ))
192                        })?,
193                        quote_attributes,
194                    },
195                    BebopOrderToSign::Aggregate(aggregate) => {
196                        // Sum taker_amounts for taker_tokens matching the token_in
197                        let amount_in: BigUint = aggregate
198                            .taker_tokens
199                            .iter()
200                            .zip(&aggregate.taker_amounts)
201                            .flat_map(|(tokens, amounts)| {
202                                tokens
203                                    .iter()
204                                    .zip(amounts)
205                                    .filter_map(|(token, amount)| {
206                                        if token == &params.token_in {
207                                            BigUint::from_str(amount).ok()
208                                        } else {
209                                            None
210                                        }
211                                    })
212                            })
213                            .sum();
214
215                        // Sum maker_amounts for maker_tokens matching the token_out
216                        let amount_out: BigUint = aggregate
217                            .maker_tokens
218                            .iter()
219                            .zip(&aggregate.maker_amounts)
220                            .flat_map(|(tokens, amounts)| {
221                                tokens
222                                    .iter()
223                                    .zip(amounts)
224                                    .filter_map(|(token, amount)| {
225                                        if token == &params.token_out {
226                                            BigUint::from_str(amount).ok()
227                                        } else {
228                                            None
229                                        }
230                                    })
231                            })
232                            .sum();
233
234                        SignedQuote {
235                            base_token: params.token_in.clone(),
236                            quote_token: params.token_out.clone(),
237                            amount_in,
238                            amount_out,
239                            quote_attributes,
240                        }
241                    }
242                };
243
244                Ok(signed_quote)
245            }
246            BebopQuoteResponse::Error(err) => Err(RFQError::FatalError(format!(
247                "Bebop API error: code {} - {} (requestId: {})",
248                err.error.error_code, err.error.message, err.error.request_id
249            ))),
250        }
251    }
252}
253
254#[async_trait]
255impl RFQClient for BebopClient {
256    fn stream(
257        &self,
258    ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
259        let tokens = self.tokens.clone();
260        let url = self.price_ws.clone();
261        let tvl_threshold = self.tvl;
262        let name = self.ws_user.clone();
263        let authorization = self.ws_key.clone();
264        let client = self.clone();
265
266        Box::pin(async_stream::stream! {
267            let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
268            let mut reconnect_attempts = 0;
269            const MAX_RECONNECT_ATTEMPTS: u32 = 10;
270
271            loop {
272                let request = Request::builder()
273                    .method("GET")
274                    .uri(&url)
275                    .header("Host", "api.bebop.xyz")
276                    .header("Upgrade", "websocket")
277                    .header("Connection", "Upgrade")
278                    .header("Sec-WebSocket-Key", generate_key())
279                    .header("Sec-WebSocket-Version", "13")
280                    .header("name", &name)
281                    .header("Authorization", &authorization)
282                    .body(())
283                    .map_err(|_| RFQError::FatalError("Failed to build request".into()))?;
284
285                // Connect to Bebop WebSocket with custom headers
286                let (ws_stream, _) = match connect_async_with_config(request, None, false).await {
287                    Ok(connection) => {
288                        info!("Successfully connected to Bebop WebSocket");
289                        reconnect_attempts = 0; // Reset counter on successful connection
290                        connection
291                    },
292                    Err(e) => {
293                        reconnect_attempts += 1;
294                        error!("Failed to connect to Bebop WebSocket (attempt {}): {}", reconnect_attempts, e);
295
296                        if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
297                            yield Err(RFQError::ConnectionError(format!("Failed to connect after {MAX_RECONNECT_ATTEMPTS} attempts: {e}")));
298                            return;
299                        }
300
301                        let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
302                        info!("Retrying connection in {} seconds...", backoff_duration.as_secs());
303                        sleep(backoff_duration).await;
304                        continue;
305                    }
306                };
307
308                let (_, mut ws_receiver) = ws_stream.split();
309
310                // Message processing loop
311                while let Some(msg) = ws_receiver.next().await {
312                    match msg {
313                        Ok(Message::Binary(data)) => {
314                            match BebopPricingUpdate::decode(&data[..]) {
315                                Ok(protobuf_update) => {
316                                    let mut new_components = HashMap::new();
317
318                                    // Process all pairs directly from protobuf
319                                    for price_data in &protobuf_update.pairs {
320                                        let base_bytes = Bytes::from(price_data.base.clone());
321                                        let quote_bytes = Bytes::from(price_data.quote.clone());
322                                        if tokens.contains(&base_bytes) && tokens.contains(&quote_bytes) {
323                                            let pair_tokens = vec![
324                                                base_bytes.clone(), quote_bytes.clone()
325                                            ];
326
327                                            let mut quote_price_data: Option<&BebopPriceData> = None;
328                                            // The quote token is not one of the approved quote tokens
329                                            // Get the price, so we can normalize our TVL calculation
330                                            if !client.quote_tokens.contains(&quote_bytes) {
331                                                for approved_quote_token in &client.quote_tokens {
332                                                    // Look for a pair containing both our quote token and an approved token
333                                                    // Can be either QUOTE/APPROVED or APPROVED/QUOTE
334                                                    if let Some(quote_data) = protobuf_update.pairs.iter()
335                                                        .find(|p| {
336                                                            (p.base == quote_bytes.as_ref() && p.quote == approved_quote_token.as_ref()) ||
337                                                            (p.quote == quote_bytes.as_ref() && p.base == approved_quote_token.as_ref())
338                                                        }) {
339                                                        quote_price_data = Some(quote_data);
340                                                        break;
341                                                    }
342                                                }
343
344                                                // Quote token doesn't have price levels in approved quote tokens.
345                                                // Skip.
346                                                if quote_price_data.is_none() {
347                                                    warn!("Quote token {} does not have price levels in approved quote token. Skipping.", hex::encode(&quote_bytes));
348                                                    continue;
349                                                }
350                                            }
351
352                                            let tvl = price_data.calculate_tvl(quote_price_data);
353                                            if tvl < tvl_threshold {
354                                                continue;
355                                            }
356
357                                            let pair_str = format!("bebop_{}/{}", hex::encode(&base_bytes), hex::encode(&quote_bytes));
358                                            let component_id = format!("{}", keccak256(pair_str.as_bytes()));
359                                            let component_with_state = client.create_component_with_state(
360                                                component_id.clone(),
361                                                pair_tokens,
362                                                price_data,
363                                                tvl
364                                            );
365                                            new_components.insert(component_id, component_with_state);
366                                        }
367                                    }
368
369                                    // Find components that were removed (existed before but not in this update)
370                                    // This includes components with no bids or asks, since they are filtered
371                                    // out by the tvl threshold.
372                                    let removed_components: HashMap<String, ProtocolComponent> = current_components
373                                        .iter()
374                                        .filter(|&(id, _)| !new_components.contains_key(id))
375                                        .map(|(k, v)| (k.clone(), v.component.clone()))
376                                        .collect();
377
378                                    // Update our current state
379                                    current_components = new_components.clone();
380
381                                    let snapshot = Snapshot {
382                                        states: new_components,
383                                        vm_storage: HashMap::new(),
384                                    };
385                                    let timestamp = SystemTime::now().duration_since(
386                                        SystemTime::UNIX_EPOCH
387                                    ).map_err(
388                                        |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
389                                    )?.as_secs();
390
391                                    let msg = StateSyncMessage::<TimestampHeader> {
392                                        header: TimestampHeader { timestamp },
393                                        snapshots: snapshot,
394                                        deltas: None, // Deltas are always None - all the changes are absolute
395                                        removed_components,
396                                    };
397
398                                    // Yield one message containing all updated pairs
399                                    yield Ok(("bebop".to_string(), msg));
400                                },
401                                Err(e) => {
402                                    error!("Failed to parse protobuf message: {}", e);
403                                    break;
404                                }
405                            }
406                        }
407                        Ok(Message::Close(_)) => {
408                            info!("WebSocket connection closed by server");
409                            break;
410                        }
411                        Err(e) => {
412                            error!("WebSocket error: {}", e);
413                            break;
414                        }
415                        _ => {} // Ignore other message types
416                    }
417                }
418
419                // If we're here, the message loop exited - always attempt to reconnect
420                reconnect_attempts += 1;
421                if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
422                    yield Err(RFQError::ConnectionError(format!("Connection failed after {MAX_RECONNECT_ATTEMPTS} attempts")));
423                    return;
424                }
425
426                let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
427                info!("Reconnecting in {} seconds (attempt {})...", backoff_duration.as_secs(), reconnect_attempts);
428                sleep(backoff_duration).await;
429                // Continue to the next iteration of the main loop
430            }
431        })
432    }
433
434    async fn request_binding_quote(
435        &self,
436        params: &GetAmountOutParams,
437    ) -> Result<SignedQuote, RFQError> {
438        let sell_token = bytes_to_address(&params.token_in)?.to_string();
439        let buy_token = bytes_to_address(&params.token_out)?.to_string();
440        let sell_amount = params.amount_in.to_string();
441        let sender = bytes_to_address(&params.sender)?.to_string();
442        let receiver = bytes_to_address(&params.receiver)?.to_string();
443
444        let url = self.quote_endpoint.clone();
445
446        let client = Client::new();
447
448        let start_time = std::time::Instant::now();
449        const MAX_RETRIES: u32 = 3;
450        let mut last_error = None;
451
452        for attempt in 0..MAX_RETRIES {
453            // Check if we have time remaining for this attempt
454            let elapsed = start_time.elapsed();
455            if elapsed >= self.quote_timeout {
456                return Err(last_error.unwrap_or_else(|| {
457                    RFQError::ConnectionError(format!(
458                        "Bebop quote request timed out after {} seconds",
459                        self.quote_timeout.as_secs()
460                    ))
461                }));
462            }
463
464            let remaining_time = self.quote_timeout - elapsed;
465
466            let request = client
467                .get(&url)
468                .query(&[
469                    ("sell_tokens", sell_token.clone()),
470                    ("buy_tokens", buy_token.clone()),
471                    ("sell_amounts", sell_amount.clone()),
472                    ("taker_address", sender.clone()),
473                    ("receiver_address", receiver.clone()),
474                    ("approval_type", "Standard".into()),
475                    ("skip_validation", "true".into()),
476                    ("skip_taker_checks", "true".into()),
477                    ("gasless", "false".into()),
478                    ("expiry_type", "standard".into()),
479                    ("fee", "0".into()),
480                    ("is_ui", "false".into()),
481                    ("source", self.ws_user.clone()),
482                ])
483                .header("accept", "application/json")
484                .header("name", &self.ws_user)
485                .header("source-auth", &self.ws_key)
486                .header("Authorization", &self.ws_key);
487
488            let response = match timeout(remaining_time, request.send()).await {
489                Ok(Ok(resp)) => resp,
490                Ok(Err(e)) => {
491                    warn!(
492                        "Bebop quote request failed (attempt {}/{}): {}",
493                        attempt + 1,
494                        MAX_RETRIES,
495                        e
496                    );
497                    last_error = Some(RFQError::ConnectionError(format!(
498                        "Failed to send Bebop quote request: {e}"
499                    )));
500                    if attempt < MAX_RETRIES - 1 {
501                        continue;
502                    } else {
503                        return Err(last_error.unwrap());
504                    }
505                }
506                Err(_) => {
507                    return Err(RFQError::ConnectionError(format!(
508                        "Bebop quote request timed out after {} seconds",
509                        self.quote_timeout.as_secs()
510                    )));
511                }
512            };
513
514            let quote_response = match response
515                .json::<BebopQuoteResponse>()
516                .await
517            {
518                Ok(resp) => resp,
519                Err(e) => {
520                    warn!(
521                        "Bebop quote response parsing failed (attempt {}/{}): {}",
522                        attempt + 1,
523                        MAX_RETRIES,
524                        e
525                    );
526                    last_error = Some(RFQError::ParsingError(format!(
527                        "Failed to parse Bebop quote response: {e}"
528                    )));
529                    if attempt < MAX_RETRIES - 1 {
530                        sleep(Duration::from_millis(100)).await;
531                        continue;
532                    } else {
533                        return Err(last_error.unwrap());
534                    }
535                }
536            };
537
538            return Self::process_quote_response(quote_response, params);
539        }
540
541        Err(last_error.unwrap_or_else(|| {
542            RFQError::ConnectionError("Bebop quote request failed after retries".to_string())
543        }))
544    }
545}
546
547#[cfg(test)]
548mod tests {
549    use std::{
550        sync::{Arc, Mutex},
551        time::Duration,
552    };
553
554    use dotenv::dotenv;
555    use futures::SinkExt;
556    use tokio::{net::TcpListener, time::timeout};
557    use tokio_tungstenite::accept_async;
558
559    use super::*;
560    use crate::rfq::constants::get_bebop_auth;
561
562    #[tokio::test]
563    #[ignore] // Requires network access and setting proper env vars
564    async fn test_bebop_websocket_connection() {
565        // We test with quote tokens that are not USDC in order to ensure our normalization works
566        // fine
567        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
568        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
569
570        dotenv().expect("Missing .env file");
571        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
572
573        let quote_tokens = HashSet::from([
574            // Use addresses we forgot to checksum (to test checksumming)
575            Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(), // USDC
576            Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), // USDT
577        ]);
578
579        let client = BebopClient::new(
580            Chain::Ethereum,
581            HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
582            10.0, // $10 minimum TVL
583            auth.user,
584            auth.key,
585            quote_tokens,
586            Duration::from_secs(30),
587        )
588        .unwrap();
589
590        let mut stream = client.stream();
591
592        // Test connection and message reception with timeout
593        let result = timeout(Duration::from_secs(10), async {
594            let mut message_count = 0;
595            let max_messages = 5;
596
597            while let Some(result) = stream.next().await {
598                match result {
599                    Ok((component_id, msg)) => {
600                        println!("Received message with ID: {component_id}");
601
602                        assert!(!component_id.is_empty());
603                        assert_eq!(component_id, "bebop");
604                        assert!(msg.header.timestamp > 0);
605                        assert!(!msg.snapshots.states.is_empty());
606
607                        let snapshot = &msg.snapshots;
608
609                        // We got at least one component
610                        assert!(!snapshot.states.is_empty());
611
612                        println!("Received {} components in this message", snapshot.states.len());
613                        for (id, component_with_state) in &snapshot.states {
614                            assert_eq!(
615                                component_with_state
616                                    .component
617                                    .protocol_system,
618                                "rfq:bebop"
619                            );
620                            assert_eq!(
621                                component_with_state
622                                    .component
623                                    .protocol_type_name,
624                                "bebop_pool"
625                            );
626                            assert_eq!(
627                                component_with_state.component.chain,
628                                Chain::Ethereum.into()
629                            );
630
631                            let attributes = &component_with_state.state.attributes;
632
633                            // Check that bids and asks exist and have non-empty byte strings
634                            assert!(attributes.contains_key("bids"));
635                            assert!(attributes.contains_key("asks"));
636                            assert!(!attributes["bids"].is_empty());
637                            assert!(!attributes["asks"].is_empty());
638
639                            if let Some(tvl) = component_with_state.component_tvl {
640                                assert!(tvl >= 0.0);
641                                println!("Component {id} TVL: ${tvl:.2}");
642                            }
643                        }
644
645                        message_count += 1;
646                        if message_count >= max_messages {
647                            break;
648                        }
649                    }
650                    Err(e) => {
651                        panic!("Stream error: {e}");
652                    }
653                }
654            }
655
656            assert!(message_count > 0, "Should have received at least one message");
657            println!("Successfully received {message_count} messages");
658        })
659        .await;
660
661        match result {
662            Ok(_) => println!("Test completed successfully"),
663            Err(_) => panic!("Test timed out - no messages received within 10 seconds"),
664        }
665    }
666
667    #[tokio::test]
668    async fn test_websocket_reconnection() {
669        // Start a mock WebSocket server that will drop connections intermittently
670        let listener = TcpListener::bind("127.0.0.1:0")
671            .await
672            .unwrap();
673        let addr = listener.local_addr().unwrap();
674
675        // Creates a thread-safe counter.
676        let connection_count = Arc::new(Mutex::new(0u32));
677
678        // We must clone - since we want to read the original value at the end of the test.
679        let connection_count_clone = connection_count.clone();
680
681        tokio::spawn(async move {
682            while let Ok((stream, _)) = listener.accept().await {
683                *connection_count_clone.lock().unwrap() += 1;
684                let count = *connection_count_clone.lock().unwrap();
685                println!("Mock server: Connection #{count} established");
686
687                tokio::spawn(async move {
688                    if let Ok(ws_stream) = accept_async(stream).await {
689                        let (mut ws_sender, _ws_receiver) = ws_stream.split();
690
691                        // Create test protobuf message
692                        let weth_addr =
693                            hex::decode("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
694                        let usdc_addr =
695                            hex::decode("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
696
697                        let test_price_data = BebopPriceData {
698                            base: weth_addr,
699                            quote: usdc_addr,
700                            last_update_ts: 1752617378,
701                            bids: vec![3070.05f32, 0.325717f32],
702                            asks: vec![3070.527f32, 0.325717f32],
703                        };
704
705                        let pricing_update = BebopPricingUpdate { pairs: vec![test_price_data] };
706
707                        let test_message = pricing_update.encode_to_vec();
708
709                        if count == 1 {
710                            // First connection: Send message successfully, then drop
711                            println!("Mock server: Connection #1 - sending message then dropping.");
712                            let _ = ws_sender
713                                .send(Message::Binary(test_message.clone().into()))
714                                .await;
715
716                            // Give time for message to be processed, then drop the connection.
717                            tokio::time::sleep(Duration::from_millis(100)).await;
718                            println!("Mock server: Dropping connection #1");
719                            let _ = ws_sender.close().await;
720                        } else if count == 2 {
721                            // Second connection: Send message successfully and maintain connection
722                            println!("Mock server: Connection #2 - maintaining stable connection.");
723                            let _ = ws_sender
724                                .send(Message::Binary(test_message.clone().into()))
725                                .await;
726                        }
727                    }
728                });
729            }
730        });
731
732        // Wait a moment for the server to start
733        tokio::time::sleep(Duration::from_millis(50)).await;
734
735        let mut test_quote_tokens = HashSet::new();
736        test_quote_tokens
737            .insert(Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap());
738
739        let tokens_formatted = vec![
740            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap(),
741            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
742        ];
743
744        // Bypass the new() constructor to mock the URL to point to our mock server.
745        let client = BebopClient {
746            chain: Chain::Ethereum,
747            price_ws: format!("ws://127.0.0.1:{}", addr.port()),
748            tokens: tokens_formatted.into_iter().collect(),
749            tvl: 1000.0,
750            ws_user: "test_user".to_string(),
751            ws_key: "test_key".to_string(),
752            quote_tokens: test_quote_tokens,
753            quote_endpoint: "".to_string(),
754            quote_timeout: Duration::from_secs(5),
755        };
756
757        let start_time = std::time::Instant::now();
758        let mut successful_messages = 0;
759        let mut connection_errors = 0;
760        let mut first_message_received = false;
761        let mut second_message_received = false;
762
763        // Expected flow:
764        // 1. Receive first message successfully
765        // 2. Connection drops
766        // 3. Client reconnects
767        // 4. Receive second message successfully
768        // Timeout if two messages are not received within 5 seconds.
769        while start_time.elapsed() < Duration::from_secs(5) && successful_messages < 2 {
770            match timeout(Duration::from_millis(1000), client.stream().next()).await {
771                Ok(Some(result)) => match result {
772                    Ok((_component_id, _message)) => {
773                        successful_messages += 1;
774                        println!("Received successful message {successful_messages}");
775
776                        if successful_messages == 1 {
777                            first_message_received = true;
778                            println!("First message received - connection should drop after this.");
779                        } else if successful_messages == 2 {
780                            second_message_received = true;
781                            println!("Second message received after reconnection.");
782                        }
783                    }
784                    Err(e) => {
785                        connection_errors += 1;
786                        println!("Connection error during reconnection: {e:?}");
787                    }
788                },
789                Ok(None) => {
790                    panic!("Stream ended unexpectedly");
791                }
792                Err(_) => {
793                    println!("Timeout waiting for message (normal during reconnections)");
794                    continue;
795                }
796            }
797        }
798
799        let final_connection_count = *connection_count.lock().unwrap();
800
801        // 1. Exactly 2 connection attempts (initial + reconnect)
802        // 2. Exactly 2 successful messages (one before drop, one after reconnect)
803
804        assert_eq!(final_connection_count, 2);
805        assert!(first_message_received);
806        assert!(second_message_received);
807        assert_eq!(connection_errors, 0);
808        assert_eq!(successful_messages, 2);
809    }
810
811    #[tokio::test]
812    #[ignore] // Requires network access and setting proper env vars
813    async fn test_bebop_quote_single_order() {
814        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
815        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
816        dotenv().expect("Missing .env file");
817        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
818
819        let client = BebopClient::new(
820            Chain::Ethereum,
821            HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
822            10.0, // $10 minimum TVL
823            auth.user,
824            auth.key,
825            HashSet::new(),
826            Duration::from_secs(30),
827        )
828        .unwrap();
829
830        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
831
832        let params = GetAmountOutParams {
833            amount_in: BigUint::from(1_000000000000000000u64),
834            token_in: token_in.clone(),
835            token_out: token_out.clone(),
836            sender: router.clone(),
837            receiver: router,
838        };
839        let quote = client
840            .request_binding_quote(&params)
841            .await
842            .unwrap();
843
844        assert_eq!(quote.base_token, token_in);
845        assert_eq!(quote.quote_token, token_out);
846        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
847
848        // Assuming the BTC - WETH price doesn't change too much at the time of running this
849        assert!(quote.amount_out > BigUint::from(3000000u64));
850
851        // SWAP_SINGLE_SELECTOR = 0x4dcebcba;
852        assert_eq!(
853            quote
854                .quote_attributes
855                .get("calldata")
856                .unwrap()[..4],
857            Bytes::from_str("0x4dcebcba")
858                .unwrap()
859                .to_vec()
860        );
861        let partial_fill_offset_slice = quote
862            .quote_attributes
863            .get("partial_fill_offset")
864            .unwrap()
865            .as_ref();
866        let mut partial_fill_offset_array = [0u8; 8];
867        partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
868
869        assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 12);
870    }
871
872    #[tokio::test]
873    #[ignore] // Requires network access and setting proper env vars
874    async fn test_bebop_quote_aggregate_order() {
875        // This will make a quote request similar to the previous test but with a very big amount
876        // We expect the Bebop Quote to have an aggregate order (split between different mms)
877        let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
878        let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
879        dotenv().expect("Missing .env file");
880        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
881
882        let client = BebopClient::new(
883            Chain::Ethereum,
884            HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
885            10.0, // $10 minimum TVL
886            auth.user,
887            auth.key,
888            HashSet::new(),
889            Duration::from_secs(30),
890        )
891        .unwrap();
892
893        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
894
895        let amount_in = BigUint::from_str("20_000_000_000").unwrap(); // 20k USDC
896        let params = GetAmountOutParams {
897            amount_in: amount_in.clone(),
898            token_in: token_in.clone(),
899            token_out: token_out.clone(),
900            sender: router.clone(),
901            receiver: router,
902        };
903        let quote = client
904            .request_binding_quote(&params)
905            .await
906            .unwrap();
907
908        assert_eq!(quote.base_token, token_in);
909        assert_eq!(quote.quote_token, token_out);
910        assert_eq!(quote.amount_in, amount_in);
911
912        // Assuming the USDC - ONDO price doesn't change too much at the time of running this
913        assert!(quote.amount_out > BigUint::from_str("18000000000000000000000").unwrap()); // ~19k ONDO
914
915        // SWAP_AGGREGATE_SELECTOR = 0xa2f74893;
916        assert_eq!(
917            quote
918                .quote_attributes
919                .get("calldata")
920                .unwrap()[..4],
921            Bytes::from_str("0xa2f74893")
922                .unwrap()
923                .to_vec()
924        );
925        let partial_fill_offset_slice = quote
926            .quote_attributes
927            .get("partial_fill_offset")
928            .unwrap()
929            .as_ref();
930        let mut partial_fill_offset_array = [0u8; 8];
931        partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
932
933        // This is the only attribute that is significantly different for the Single and Aggregate
934        // Order
935        assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 2);
936    }
937
938    #[test]
939    fn test_process_bebop_quote_response_aggregate_order() {
940        let json =
941            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
942                .unwrap();
943        let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
944        let params = GetAmountOutParams {
945            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
946            token_in: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
947            token_out: Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap(),
948            sender: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
949            receiver: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
950        };
951        let res = BebopClient::process_quote_response(quote_response, &params).unwrap();
952        assert_eq!(res.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
953        assert_eq!(res.amount_in, BigUint::from_str("20000000000").unwrap());
954        assert_eq!(res.base_token, params.token_in);
955        assert_eq!(res.quote_token, params.token_out);
956    }
957
958    #[test]
959    fn test_process_bebop_quote_response_aggregate_order_with_multihop() {
960        let json = std::fs::read_to_string(
961            "src/rfq/protocols/bebop/test_responses/aggregate_order_with_multihop.json",
962        )
963        .unwrap();
964        let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
965        let params = GetAmountOutParams {
966            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
967            token_in: Bytes::from_str("0xDEf1CA1fb7FBcDC777520aa7f396b4E015F497aB").unwrap(),
968            token_out: Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(),
969            sender: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
970            receiver: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
971        };
972        let res = BebopClient::process_quote_response(quote_response, &params).unwrap();
973        assert_eq!(res.amount_out, BigUint::from_str("11186653890").unwrap());
974        assert_eq!(res.amount_in, BigUint::from_str("43067495979235520920162").unwrap());
975        assert_eq!(res.base_token, params.token_in);
976        assert_eq!(res.quote_token, params.token_out);
977    }
978
979    /// Helper function to create a mock server that responds after a delay
980    async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
981        use tokio::io::AsyncWriteExt;
982
983        let listener = TcpListener::bind("127.0.0.1:0")
984            .await
985            .unwrap();
986        let addr = listener.local_addr().unwrap();
987
988        let json_response =
989            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
990                .unwrap();
991
992        tokio::spawn(async move {
993            while let Ok((mut stream, _)) = listener.accept().await {
994                let json_response_clone = json_response.clone();
995                tokio::spawn(async move {
996                    sleep(Duration::from_millis(delay_ms)).await;
997
998                    let response = format!(
999                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1000                        json_response_clone.len(),
1001                        json_response_clone
1002                    );
1003                    let _ = stream
1004                        .write_all(response.as_bytes())
1005                        .await;
1006                    let _ = stream.flush().await;
1007                    let _ = stream.shutdown().await;
1008                });
1009            }
1010        });
1011
1012        addr
1013    }
1014
1015    fn create_test_bebop_client(quote_endpoint: String, quote_timeout: Duration) -> BebopClient {
1016        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1017        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1018
1019        BebopClient {
1020            chain: Chain::Ethereum,
1021            price_ws: "ws://example.com".to_string(),
1022            quote_endpoint,
1023            tokens: HashSet::from([token_in, token_out]),
1024            tvl: 10.0,
1025            ws_user: "test_user".to_string(),
1026            ws_key: "test_key".to_string(),
1027            quote_tokens: HashSet::new(),
1028            quote_timeout,
1029        }
1030    }
1031
1032    /// Helper function to create test quote params matching aggregate_order.json
1033    fn create_test_quote_params() -> GetAmountOutParams {
1034        let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1035        let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
1036        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
1037
1038        GetAmountOutParams {
1039            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
1040            token_in,
1041            token_out,
1042            sender: router.clone(),
1043            receiver: router,
1044        }
1045    }
1046
1047    #[tokio::test]
1048    async fn test_bebop_quote_timeout() {
1049        let addr = create_delayed_response_server(500).await;
1050
1051        // Test 1: Client with short timeout (200ms) - should timeout
1052        let client_short_timeout = create_test_bebop_client(
1053            format!("http://127.0.0.1:{}/quote", addr.port()),
1054            Duration::from_millis(200),
1055        );
1056        let params = create_test_quote_params();
1057
1058        let start = std::time::Instant::now();
1059        let result = client_short_timeout
1060            .request_binding_quote(&params)
1061            .await;
1062        let elapsed = start.elapsed();
1063
1064        assert!(result.is_err());
1065        let err = result.unwrap_err();
1066        match err {
1067            RFQError::ConnectionError(msg) => {
1068                assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
1069            }
1070            _ => panic!("Expected ConnectionError, got: {:?}", err),
1071        }
1072        assert!(
1073            elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
1074            "Expected timeout around 200ms, got: {:?}",
1075            elapsed
1076        );
1077
1078        // Test 2: Client with long timeout (1 seconds) - should wait and receive response
1079        // Note: With retry logic, we may need multiple attempts if the response is malformed,
1080        // so we need a longer timeout to account for retries
1081        let client_long_timeout = create_test_bebop_client(
1082            format!("http://127.0.0.1:{}/quote", addr.port()),
1083            Duration::from_secs(1),
1084        );
1085
1086        let result = client_long_timeout
1087            .request_binding_quote(&params)
1088            .await;
1089
1090        // Should succeed - the server waits 500ms which is within the 1s timeout
1091        assert!(result.is_ok(), "Expected success, got: {:?}", result);
1092        let quote = result.unwrap();
1093
1094        // Verify the quote matches what we expect from aggregate_order.json
1095        assert_eq!(quote.base_token, params.token_in);
1096        assert_eq!(quote.quote_token, params.token_out);
1097    }
1098
1099    /// Helper function to create a mock server that fails twice, then succeeds with
1100    /// aggregate_order.json
1101    async fn create_retry_server() -> (std::net::SocketAddr, Arc<Mutex<u32>>) {
1102        use std::sync::{Arc, Mutex};
1103
1104        use tokio::io::AsyncWriteExt;
1105
1106        let request_count = Arc::new(Mutex::new(0u32));
1107        let request_count_clone = request_count.clone();
1108
1109        let listener = TcpListener::bind("127.0.0.1:0")
1110            .await
1111            .unwrap();
1112        let addr = listener.local_addr().unwrap();
1113
1114        let json_response =
1115            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
1116                .unwrap();
1117
1118        tokio::spawn(async move {
1119            while let Ok((mut stream, _)) = listener.accept().await {
1120                let count_clone = request_count_clone.clone();
1121                let json_response_clone = json_response.clone();
1122                tokio::spawn(async move {
1123                    *count_clone.lock().unwrap() += 1;
1124                    let count = *count_clone.lock().unwrap();
1125                    println!("Mock server: Received request #{count}");
1126
1127                    if count <= 2 {
1128                        let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1129                        let _ = stream
1130                            .write_all(response.as_bytes())
1131                            .await;
1132                    } else {
1133                        let response = format!(
1134                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1135                            json_response_clone.len(),
1136                            json_response_clone
1137                        );
1138                        let _ = stream
1139                            .write_all(response.as_bytes())
1140                            .await;
1141                    }
1142                    let _ = stream.flush().await;
1143                    let _ = stream.shutdown().await;
1144                });
1145            }
1146        });
1147        (addr, request_count)
1148    }
1149
1150    #[tokio::test]
1151    async fn test_bebop_quote_retry_on_bad_response() {
1152        let (addr, request_count) = create_retry_server().await;
1153
1154        let client = create_test_bebop_client(
1155            format!("http://127.0.0.1:{}/quote", addr.port()),
1156            Duration::from_secs(5),
1157        );
1158        let params = create_test_quote_params();
1159        let result = client
1160            .request_binding_quote(&params)
1161            .await;
1162
1163        assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1164        let quote = result.unwrap();
1165
1166        // Verify the quote (amounts from aggregate_order.json)
1167        assert_eq!(quote.amount_in, BigUint::from_str("20000000000").unwrap());
1168        assert_eq!(quote.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
1169
1170        // Verify exactly 3 requests were made (2 failures + 1 success)
1171        let final_count = *request_count.lock().unwrap();
1172        assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1173    }
1174}