Skip to main content

tycho_simulation/rfq/protocols/bebop/
client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address};
8use async_trait::async_trait;
9use futures::{stream::BoxStream, StreamExt};
10use http::Request;
11use num_bigint::BigUint;
12use prost::Message as ProstMessage;
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use tokio::time::{sleep, timeout, Duration};
16use tokio_tungstenite::{
17    connect_async_with_config,
18    tungstenite::{handshake::client::generate_key, Message},
19};
20use tracing::{error, info, warn};
21use tycho_common::{
22    models::{protocol::GetAmountOutParams, Chain},
23    simulation::indicatively_priced::SignedQuote,
24    Bytes,
25};
26
27use crate::{
28    rfq::{
29        client::RFQClient,
30        errors::RFQError,
31        models::TimestampHeader,
32        protocols::bebop::models::{
33            BebopOrderToSign, BebopPriceData, BebopPricingUpdate, BebopQuoteResponse,
34        },
35    },
36    tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
37    tycho_common::models::protocol::{ProtocolComponent, ProtocolComponentState},
38};
39
40fn bytes_to_address(address: &Bytes) -> Result<Address, RFQError> {
41    if address.len() == 20 {
42        Ok(Address::from_slice(address))
43    } else {
44        Err(RFQError::InvalidInput(format!("Invalid ERC20 token address: {address:?}")))
45    }
46}
47
48/// Maps a Chain to its corresponding Bebop WebSocket URL
49fn chain_to_bebop_url(chain: Chain) -> Result<String, RFQError> {
50    let chain_path = match chain {
51        Chain::Ethereum => "ethereum",
52        Chain::Base => "base",
53        _ => return Err(RFQError::FatalError(format!("Unsupported chain: {chain:?}"))),
54    };
55    let url = format!("api.bebop.xyz/pmm/{chain_path}/v3");
56    Ok(url)
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize)]
60pub struct BebopClient {
61    chain: Chain,
62    price_ws: String,
63    quote_endpoint: String,
64    // Tokens that we want prices for
65    tokens: HashSet<Bytes>,
66    // Min tvl value in the quote token.
67    tvl: f64,
68    // name header for authentication
69    #[serde(skip_serializing, default)]
70    ws_user: String,
71    // key header for authentication
72    #[serde(skip_serializing, default)]
73    ws_key: String,
74    // quote tokens to normalize to for TVL purposes. Should have the same prices.
75    quote_tokens: HashSet<Bytes>,
76    quote_timeout: Duration,
77}
78
79impl BebopClient {
80    pub const PROTOCOL_SYSTEM: &'static str = "rfq:bebop";
81
82    pub fn new(
83        chain: Chain,
84        tokens: HashSet<Bytes>,
85        tvl: f64,
86        ws_user: String,
87        ws_key: String,
88        quote_tokens: HashSet<Bytes>,
89        quote_timeout: Duration,
90    ) -> Result<Self, RFQError> {
91        let url = chain_to_bebop_url(chain)?;
92        Ok(Self {
93            price_ws: "wss://".to_string() + &url + "/pricing?format=protobuf",
94            quote_endpoint: "https://".to_string() + &url + "/quote",
95            tokens,
96            chain,
97            tvl,
98            ws_user,
99            ws_key,
100            quote_tokens,
101            quote_timeout,
102        })
103    }
104
105    fn create_component_with_state(
106        &self,
107        component_id: String,
108        tokens: Vec<tycho_common::Bytes>,
109        price_data: &BebopPriceData,
110        tvl: f64,
111    ) -> ComponentWithState {
112        let protocol_component = ProtocolComponent {
113            id: component_id.clone(),
114            protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
115            protocol_type_name: "bebop_pool".to_string(),
116            chain: self.chain,
117            tokens,
118            contract_addresses: vec![], // empty for RFQ
119            static_attributes: Default::default(),
120            change: Default::default(),
121            creation_tx: Default::default(),
122            created_at: Default::default(),
123        };
124
125        let mut attributes = HashMap::new();
126
127        // Store all bids and asks as JSON strings, since we cannot store arrays
128        // Convert flat arrays [price1, size1, price2, size2, ...] to pairs [(price1, size1),
129        // (price2, size2), ...]
130        if !price_data.bids.is_empty() {
131            let bids_pairs: Vec<(f32, f32)> = price_data
132                .bids
133                .chunks_exact(2)
134                .map(|chunk| (chunk[0], chunk[1]))
135                .collect();
136            let bids_json = serde_json::to_string(&bids_pairs).unwrap_or_default();
137            attributes.insert("bids".to_string(), bids_json.as_bytes().to_vec().into());
138        }
139        if !price_data.asks.is_empty() {
140            let asks_pairs: Vec<(f32, f32)> = price_data
141                .asks
142                .chunks_exact(2)
143                .map(|chunk| (chunk[0], chunk[1]))
144                .collect();
145            let asks_json = serde_json::to_string(&asks_pairs).unwrap_or_default();
146            attributes.insert("asks".to_string(), asks_json.as_bytes().to_vec().into());
147        }
148
149        ComponentWithState {
150            state: ProtocolComponentState::new(&component_id, attributes, HashMap::new()),
151            component: protocol_component,
152            component_tvl: Some(tvl),
153            entrypoints: vec![],
154        }
155    }
156
157    fn process_quote_response(
158        quote_response: BebopQuoteResponse,
159        params: &GetAmountOutParams,
160    ) -> Result<SignedQuote, RFQError> {
161        match quote_response {
162            BebopQuoteResponse::Success(quote) => {
163                quote.validate(params)?;
164
165                let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
166                quote_attributes.insert("calldata".into(), quote.tx.data);
167                quote_attributes.insert(
168                    "partial_fill_offset".into(),
169                    Bytes::from(
170                        quote
171                            .partial_fill_offset
172                            .to_be_bytes()
173                            .to_vec(),
174                    ),
175                );
176                let signed_quote = match quote.to_sign {
177                    BebopOrderToSign::Single(ref single) => SignedQuote {
178                        base_token: params.token_in.clone(),
179                        quote_token: params.token_out.clone(),
180                        amount_in: BigUint::from_str(&single.taker_amount).map_err(|_| {
181                            RFQError::ParsingError(format!(
182                                "Failed to parse amount in string: {}",
183                                single.taker_amount
184                            ))
185                        })?,
186                        amount_out: BigUint::from_str(&single.maker_amount).map_err(|_| {
187                            RFQError::ParsingError(format!(
188                                "Failed to parse amount out string: {}",
189                                single.maker_amount
190                            ))
191                        })?,
192                        quote_attributes,
193                    },
194                    BebopOrderToSign::Aggregate(aggregate) => {
195                        // Sum taker_amounts for taker_tokens matching the token_in
196                        let amount_in: BigUint = aggregate
197                            .taker_tokens
198                            .iter()
199                            .zip(&aggregate.taker_amounts)
200                            .flat_map(|(tokens, amounts)| {
201                                tokens
202                                    .iter()
203                                    .zip(amounts)
204                                    .filter_map(|(token, amount)| {
205                                        if token == &params.token_in {
206                                            BigUint::from_str(amount).ok()
207                                        } else {
208                                            None
209                                        }
210                                    })
211                            })
212                            .sum();
213
214                        // Sum maker_amounts for maker_tokens matching the token_out
215                        let amount_out: BigUint = aggregate
216                            .maker_tokens
217                            .iter()
218                            .zip(&aggregate.maker_amounts)
219                            .flat_map(|(tokens, amounts)| {
220                                tokens
221                                    .iter()
222                                    .zip(amounts)
223                                    .filter_map(|(token, amount)| {
224                                        if token == &params.token_out {
225                                            BigUint::from_str(amount).ok()
226                                        } else {
227                                            None
228                                        }
229                                    })
230                            })
231                            .sum();
232
233                        SignedQuote {
234                            base_token: params.token_in.clone(),
235                            quote_token: params.token_out.clone(),
236                            amount_in,
237                            amount_out,
238                            quote_attributes,
239                        }
240                    }
241                };
242
243                Ok(signed_quote)
244            }
245            BebopQuoteResponse::Error(err) => Err(RFQError::FatalError(format!(
246                "Bebop API error: code {} - {} (requestId: {})",
247                err.error.error_code, err.error.message, err.error.request_id
248            ))),
249        }
250    }
251}
252
253#[async_trait]
254impl RFQClient for BebopClient {
255    fn stream(
256        &self,
257    ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
258        let tokens = self.tokens.clone();
259        let url = self.price_ws.clone();
260        let tvl_threshold = self.tvl;
261        let name = self.ws_user.clone();
262        let authorization = self.ws_key.clone();
263        let client = self.clone();
264
265        Box::pin(async_stream::stream! {
266            let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
267            let mut reconnect_attempts = 0;
268            const MAX_RECONNECT_ATTEMPTS: u32 = 10;
269
270            loop {
271                let request = Request::builder()
272                    .method("GET")
273                    .uri(&url)
274                    .header("Host", "api.bebop.xyz")
275                    .header("Upgrade", "websocket")
276                    .header("Connection", "Upgrade")
277                    .header("Sec-WebSocket-Key", generate_key())
278                    .header("Sec-WebSocket-Version", "13")
279                    .header("name", &name)
280                    .header("Authorization", &authorization)
281                    .body(())
282                    .map_err(|_| RFQError::FatalError("Failed to build request".into()))?;
283
284                // Connect to Bebop WebSocket with custom headers
285                let (ws_stream, _) = match connect_async_with_config(request, None, false).await {
286                    Ok(connection) => {
287                        info!("Successfully connected to Bebop WebSocket");
288                        reconnect_attempts = 0; // Reset counter on successful connection
289                        connection
290                    },
291                    Err(e) => {
292                        reconnect_attempts += 1;
293                        error!("Failed to connect to Bebop WebSocket (attempt {}): {}", reconnect_attempts, e);
294
295                        if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
296                            yield Err(RFQError::ConnectionError(format!("Failed to connect after {MAX_RECONNECT_ATTEMPTS} attempts: {e}")));
297                            return;
298                        }
299
300                        let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
301                        info!("Retrying connection in {} seconds...", backoff_duration.as_secs());
302                        sleep(backoff_duration).await;
303                        continue;
304                    }
305                };
306
307                let (_, mut ws_receiver) = ws_stream.split();
308
309                // Message processing loop
310                while let Some(msg) = ws_receiver.next().await {
311                    match msg {
312                        Ok(Message::Binary(data)) => {
313                            match BebopPricingUpdate::decode(&data[..]) {
314                                Ok(protobuf_update) => {
315                                    let mut new_components = HashMap::new();
316
317                                    // Process all pairs directly from protobuf
318                                    for price_data in &protobuf_update.pairs {
319                                        let base_bytes = Bytes::from(price_data.base.clone());
320                                        let quote_bytes = Bytes::from(price_data.quote.clone());
321                                        if tokens.contains(&base_bytes) && tokens.contains(&quote_bytes) {
322                                            let pair_tokens = vec![
323                                                base_bytes.clone(), quote_bytes.clone()
324                                            ];
325
326                                            let mut quote_price_data: Option<&BebopPriceData> = None;
327                                            // The quote token is not one of the approved quote tokens
328                                            // Get the price, so we can normalize our TVL calculation
329                                            if !client.quote_tokens.contains(&quote_bytes) {
330                                                for approved_quote_token in &client.quote_tokens {
331                                                    // Look for a pair containing both our quote token and an approved token
332                                                    // Can be either QUOTE/APPROVED or APPROVED/QUOTE
333                                                    if let Some(quote_data) = protobuf_update.pairs.iter()
334                                                        .find(|p| {
335                                                            (p.base == quote_bytes.as_ref() && p.quote == approved_quote_token.as_ref()) ||
336                                                            (p.quote == quote_bytes.as_ref() && p.base == approved_quote_token.as_ref())
337                                                        }) {
338                                                        quote_price_data = Some(quote_data);
339                                                        break;
340                                                    }
341                                                }
342
343                                                // Quote token doesn't have price levels in approved quote tokens.
344                                                // Skip.
345                                                if quote_price_data.is_none() {
346                                                    warn!("Quote token {} does not have price levels in approved quote token. Skipping.", hex::encode(&quote_bytes));
347                                                    continue;
348                                                }
349                                            }
350
351                                            let tvl = price_data.calculate_tvl(quote_price_data);
352                                            if tvl < tvl_threshold {
353                                                continue;
354                                            }
355
356                                            let pair_str = format!("bebop_{}/{}", hex::encode(&base_bytes), hex::encode(&quote_bytes));
357                                            let component_id = format!("{}", keccak256(pair_str.as_bytes()));
358                                            let component_with_state = client.create_component_with_state(
359                                                component_id.clone(),
360                                                pair_tokens,
361                                                price_data,
362                                                tvl
363                                            );
364                                            new_components.insert(component_id, component_with_state);
365                                        }
366                                    }
367
368                                    // Find components that were removed (existed before but not in this update)
369                                    // This includes components with no bids or asks, since they are filtered
370                                    // out by the tvl threshold.
371                                    let removed_components: HashMap<String, ProtocolComponent> = current_components
372                                        .iter()
373                                        .filter(|&(id, _)| !new_components.contains_key(id))
374                                        .map(|(k, v)| (k.clone(), v.component.clone()))
375                                        .collect();
376
377                                    // Update our current state
378                                    current_components = new_components.clone();
379
380                                    let snapshot = Snapshot {
381                                        states: new_components,
382                                        vm_storage: HashMap::new(),
383                                    };
384                                    let timestamp = SystemTime::now().duration_since(
385                                        SystemTime::UNIX_EPOCH
386                                    ).map_err(
387                                        |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
388                                    )?.as_secs();
389
390                                    let msg = StateSyncMessage::<TimestampHeader> {
391                                        header: TimestampHeader { timestamp },
392                                        snapshots: snapshot,
393                                        deltas: None, // Deltas are always None - all the changes are absolute
394                                        removed_components,
395                                    };
396
397                                    // Yield one message containing all updated pairs
398                                    yield Ok(("bebop".to_string(), msg));
399                                },
400                                Err(e) => {
401                                    error!("Failed to parse protobuf message: {}", e);
402                                    break;
403                                }
404                            }
405                        }
406                        Ok(Message::Close(_)) => {
407                            info!("WebSocket connection closed by server");
408                            break;
409                        }
410                        Err(e) => {
411                            error!("WebSocket error: {}", e);
412                            break;
413                        }
414                        _ => {} // Ignore other message types
415                    }
416                }
417
418                // If we're here, the message loop exited - always attempt to reconnect
419                reconnect_attempts += 1;
420                if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
421                    yield Err(RFQError::ConnectionError(format!("Connection failed after {MAX_RECONNECT_ATTEMPTS} attempts")));
422                    return;
423                }
424
425                let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
426                info!("Reconnecting in {} seconds (attempt {})...", backoff_duration.as_secs(), reconnect_attempts);
427                sleep(backoff_duration).await;
428                // Continue to the next iteration of the main loop
429            }
430        })
431    }
432
433    async fn request_binding_quote(
434        &self,
435        params: &GetAmountOutParams,
436    ) -> Result<SignedQuote, RFQError> {
437        let sell_token = bytes_to_address(&params.token_in)?.to_string();
438        let buy_token = bytes_to_address(&params.token_out)?.to_string();
439        let sell_amount = params.amount_in.to_string();
440        let sender = bytes_to_address(&params.sender)?.to_string();
441        let receiver = bytes_to_address(&params.receiver)?.to_string();
442
443        let url = self.quote_endpoint.clone();
444
445        let client = Client::new();
446
447        let start_time = std::time::Instant::now();
448        const MAX_RETRIES: u32 = 3;
449        let mut last_error = None;
450
451        for attempt in 0..MAX_RETRIES {
452            // Check if we have time remaining for this attempt
453            let elapsed = start_time.elapsed();
454            if elapsed >= self.quote_timeout {
455                return Err(last_error.unwrap_or_else(|| {
456                    RFQError::ConnectionError(format!(
457                        "Bebop quote request timed out after {} seconds",
458                        self.quote_timeout.as_secs()
459                    ))
460                }));
461            }
462
463            let remaining_time = self.quote_timeout - elapsed;
464
465            let request = client
466                .get(&url)
467                .query(&[
468                    ("sell_tokens", sell_token.clone()),
469                    ("buy_tokens", buy_token.clone()),
470                    ("sell_amounts", sell_amount.clone()),
471                    ("taker_address", sender.clone()),
472                    ("receiver_address", receiver.clone()),
473                    ("approval_type", "Standard".into()),
474                    ("skip_validation", "true".into()),
475                    ("skip_taker_checks", "true".into()),
476                    ("gasless", "false".into()),
477                    ("expiry_type", "standard".into()),
478                    ("fee", "0".into()),
479                    ("is_ui", "false".into()),
480                    ("source", self.ws_user.clone()),
481                ])
482                .header("accept", "application/json")
483                .header("name", &self.ws_user)
484                .header("source-auth", &self.ws_key)
485                .header("Authorization", &self.ws_key);
486
487            let response = match timeout(remaining_time, request.send()).await {
488                Ok(Ok(resp)) => resp,
489                Ok(Err(e)) => {
490                    warn!(
491                        "Bebop quote request failed (attempt {}/{}): {}",
492                        attempt + 1,
493                        MAX_RETRIES,
494                        e
495                    );
496                    last_error = Some(RFQError::ConnectionError(format!(
497                        "Failed to send Bebop quote request: {e}"
498                    )));
499                    if attempt < MAX_RETRIES - 1 {
500                        continue;
501                    } else {
502                        return Err(last_error.unwrap());
503                    }
504                }
505                Err(_) => {
506                    return Err(RFQError::ConnectionError(format!(
507                        "Bebop quote request timed out after {} seconds",
508                        self.quote_timeout.as_secs()
509                    )));
510                }
511            };
512
513            let quote_response = match response
514                .json::<BebopQuoteResponse>()
515                .await
516            {
517                Ok(resp) => resp,
518                Err(e) => {
519                    warn!(
520                        "Bebop quote response parsing failed (attempt {}/{}): {}",
521                        attempt + 1,
522                        MAX_RETRIES,
523                        e
524                    );
525                    last_error = Some(RFQError::ParsingError(format!(
526                        "Failed to parse Bebop quote response: {e}"
527                    )));
528                    if attempt < MAX_RETRIES - 1 {
529                        sleep(Duration::from_millis(100)).await;
530                        continue;
531                    } else {
532                        return Err(last_error.unwrap());
533                    }
534                }
535            };
536
537            return Self::process_quote_response(quote_response, params);
538        }
539
540        Err(last_error.unwrap_or_else(|| {
541            RFQError::ConnectionError("Bebop quote request failed after retries".to_string())
542        }))
543    }
544}
545
546#[cfg(test)]
547mod tests {
548    use std::{
549        sync::{Arc, Mutex},
550        time::Duration,
551    };
552
553    use dotenv::dotenv;
554    use futures::SinkExt;
555    use tokio::{net::TcpListener, time::timeout};
556    use tokio_tungstenite::accept_async;
557
558    use super::*;
559    use crate::rfq::constants::get_bebop_auth;
560
561    #[tokio::test]
562    #[ignore] // Requires network access and setting proper env vars
563    async fn test_bebop_websocket_connection() {
564        // We test with quote tokens that are not USDC in order to ensure our normalization works
565        // fine
566        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
567        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
568
569        dotenv().expect("Missing .env file");
570        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
571
572        let quote_tokens = HashSet::from([
573            // Use addresses we forgot to checksum (to test checksumming)
574            Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(), // USDC
575            Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), // USDT
576        ]);
577
578        let client = BebopClient::new(
579            Chain::Ethereum,
580            HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
581            10.0, // $10 minimum TVL
582            auth.user,
583            auth.key,
584            quote_tokens,
585            Duration::from_secs(30),
586        )
587        .unwrap();
588
589        let mut stream = client.stream();
590
591        // Test connection and message reception with timeout
592        let result = timeout(Duration::from_secs(10), async {
593            let mut message_count = 0;
594            let max_messages = 5;
595
596            while let Some(result) = stream.next().await {
597                match result {
598                    Ok((component_id, msg)) => {
599                        println!("Received message with ID: {component_id}");
600
601                        assert!(!component_id.is_empty());
602                        assert_eq!(component_id, "bebop");
603                        assert!(msg.header.timestamp > 0);
604                        assert!(!msg.snapshots.states.is_empty());
605
606                        let snapshot = &msg.snapshots;
607
608                        // We got at least one component
609                        assert!(!snapshot.states.is_empty());
610
611                        println!("Received {} components in this message", snapshot.states.len());
612                        for (id, component_with_state) in &snapshot.states {
613                            assert_eq!(
614                                component_with_state
615                                    .component
616                                    .protocol_system,
617                                "rfq:bebop"
618                            );
619                            assert_eq!(
620                                component_with_state
621                                    .component
622                                    .protocol_type_name,
623                                "bebop_pool"
624                            );
625                            assert_eq!(component_with_state.component.chain, Chain::Ethereum);
626
627                            let attributes = &component_with_state.state.attributes;
628
629                            // Check that bids and asks exist and have non-empty byte strings
630                            assert!(attributes.contains_key("bids"));
631                            assert!(attributes.contains_key("asks"));
632                            assert!(!attributes["bids"].is_empty());
633                            assert!(!attributes["asks"].is_empty());
634
635                            if let Some(tvl) = component_with_state.component_tvl {
636                                assert!(tvl >= 0.0);
637                                println!("Component {id} TVL: ${tvl:.2}");
638                            }
639                        }
640
641                        message_count += 1;
642                        if message_count >= max_messages {
643                            break;
644                        }
645                    }
646                    Err(e) => {
647                        panic!("Stream error: {e}");
648                    }
649                }
650            }
651
652            assert!(message_count > 0, "Should have received at least one message");
653            println!("Successfully received {message_count} messages");
654        })
655        .await;
656
657        match result {
658            Ok(_) => println!("Test completed successfully"),
659            Err(_) => panic!("Test timed out - no messages received within 10 seconds"),
660        }
661    }
662
663    #[tokio::test]
664    async fn test_websocket_reconnection() {
665        // Start a mock WebSocket server that will drop connections intermittently
666        let listener = TcpListener::bind("127.0.0.1:0")
667            .await
668            .unwrap();
669        let addr = listener.local_addr().unwrap();
670
671        // Creates a thread-safe counter.
672        let connection_count = Arc::new(Mutex::new(0u32));
673
674        // We must clone - since we want to read the original value at the end of the test.
675        let connection_count_clone = connection_count.clone();
676
677        tokio::spawn(async move {
678            while let Ok((stream, _)) = listener.accept().await {
679                *connection_count_clone.lock().unwrap() += 1;
680                let count = *connection_count_clone.lock().unwrap();
681                println!("Mock server: Connection #{count} established");
682
683                tokio::spawn(async move {
684                    if let Ok(ws_stream) = accept_async(stream).await {
685                        let (mut ws_sender, _ws_receiver) = ws_stream.split();
686
687                        // Create test protobuf message
688                        let weth_addr =
689                            hex::decode("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
690                        let usdc_addr =
691                            hex::decode("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
692
693                        let test_price_data = BebopPriceData {
694                            base: weth_addr,
695                            quote: usdc_addr,
696                            last_update_ts: 1752617378,
697                            bids: vec![3070.05f32, 0.325717f32],
698                            asks: vec![3070.527f32, 0.325717f32],
699                        };
700
701                        let pricing_update = BebopPricingUpdate { pairs: vec![test_price_data] };
702
703                        let test_message = pricing_update.encode_to_vec();
704
705                        if count == 1 {
706                            // First connection: Send message successfully, then drop
707                            println!("Mock server: Connection #1 - sending message then dropping.");
708                            let _ = ws_sender
709                                .send(Message::Binary(test_message.clone().into()))
710                                .await;
711
712                            // Give time for message to be processed, then drop the connection.
713                            tokio::time::sleep(Duration::from_millis(100)).await;
714                            println!("Mock server: Dropping connection #1");
715                            let _ = ws_sender.close().await;
716                        } else if count == 2 {
717                            // Second connection: Send message successfully and maintain connection
718                            println!("Mock server: Connection #2 - maintaining stable connection.");
719                            let _ = ws_sender
720                                .send(Message::Binary(test_message.clone().into()))
721                                .await;
722                        }
723                    }
724                });
725            }
726        });
727
728        // Wait a moment for the server to start
729        tokio::time::sleep(Duration::from_millis(50)).await;
730
731        let mut test_quote_tokens = HashSet::new();
732        test_quote_tokens
733            .insert(Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap());
734
735        let tokens_formatted = vec![
736            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap(),
737            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
738        ];
739
740        // Bypass the new() constructor to mock the URL to point to our mock server.
741        let client = BebopClient {
742            chain: Chain::Ethereum,
743            price_ws: format!("ws://127.0.0.1:{}", addr.port()),
744            tokens: tokens_formatted.into_iter().collect(),
745            tvl: 1000.0,
746            ws_user: "test_user".to_string(),
747            ws_key: "test_key".to_string(),
748            quote_tokens: test_quote_tokens,
749            quote_endpoint: "".to_string(),
750            quote_timeout: Duration::from_secs(5),
751        };
752
753        let start_time = std::time::Instant::now();
754        let mut successful_messages = 0;
755        let mut connection_errors = 0;
756        let mut first_message_received = false;
757        let mut second_message_received = false;
758
759        // Expected flow:
760        // 1. Receive first message successfully
761        // 2. Connection drops
762        // 3. Client reconnects
763        // 4. Receive second message successfully
764        // Timeout if two messages are not received within 5 seconds.
765        while start_time.elapsed() < Duration::from_secs(5) && successful_messages < 2 {
766            match timeout(Duration::from_millis(1000), client.stream().next()).await {
767                Ok(Some(result)) => match result {
768                    Ok((_component_id, _message)) => {
769                        successful_messages += 1;
770                        println!("Received successful message {successful_messages}");
771
772                        if successful_messages == 1 {
773                            first_message_received = true;
774                            println!("First message received - connection should drop after this.");
775                        } else if successful_messages == 2 {
776                            second_message_received = true;
777                            println!("Second message received after reconnection.");
778                        }
779                    }
780                    Err(e) => {
781                        connection_errors += 1;
782                        println!("Connection error during reconnection: {e:?}");
783                    }
784                },
785                Ok(None) => {
786                    panic!("Stream ended unexpectedly");
787                }
788                Err(_) => {
789                    println!("Timeout waiting for message (normal during reconnections)");
790                    continue;
791                }
792            }
793        }
794
795        let final_connection_count = *connection_count.lock().unwrap();
796
797        // 1. Exactly 2 connection attempts (initial + reconnect)
798        // 2. Exactly 2 successful messages (one before drop, one after reconnect)
799
800        assert_eq!(final_connection_count, 2);
801        assert!(first_message_received);
802        assert!(second_message_received);
803        assert_eq!(connection_errors, 0);
804        assert_eq!(successful_messages, 2);
805    }
806
807    #[tokio::test]
808    #[ignore] // Requires network access and setting proper env vars
809    async fn test_bebop_quote_single_order() {
810        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
811        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
812        dotenv().expect("Missing .env file");
813        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
814
815        let client = BebopClient::new(
816            Chain::Ethereum,
817            HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
818            10.0, // $10 minimum TVL
819            auth.user,
820            auth.key,
821            HashSet::new(),
822            Duration::from_secs(30),
823        )
824        .unwrap();
825
826        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
827
828        let params = GetAmountOutParams {
829            amount_in: BigUint::from(1_000000000000000000u64),
830            token_in: token_in.clone(),
831            token_out: token_out.clone(),
832            sender: router.clone(),
833            receiver: router,
834        };
835        let quote = client
836            .request_binding_quote(&params)
837            .await
838            .unwrap();
839
840        assert_eq!(quote.base_token, token_in);
841        assert_eq!(quote.quote_token, token_out);
842        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
843
844        // Assuming the BTC - WETH price doesn't change too much at the time of running this
845        assert!(quote.amount_out > BigUint::from(3000000u64));
846
847        // SWAP_SINGLE_SELECTOR = 0x4dcebcba;
848        assert_eq!(
849            quote
850                .quote_attributes
851                .get("calldata")
852                .unwrap()[..4],
853            Bytes::from_str("0x4dcebcba")
854                .unwrap()
855                .to_vec()
856        );
857        let partial_fill_offset_slice = quote
858            .quote_attributes
859            .get("partial_fill_offset")
860            .unwrap()
861            .as_ref();
862        let mut partial_fill_offset_array = [0u8; 8];
863        partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
864
865        assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 12);
866    }
867
868    #[tokio::test]
869    #[ignore] // Requires network access and setting proper env vars
870    async fn test_bebop_quote_aggregate_order() {
871        // This will make a quote request similar to the previous test but with a very big amount
872        // We expect the Bebop Quote to have an aggregate order (split between different mms)
873        let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
874        let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
875        dotenv().expect("Missing .env file");
876        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
877
878        let client = BebopClient::new(
879            Chain::Ethereum,
880            HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
881            10.0, // $10 minimum TVL
882            auth.user,
883            auth.key,
884            HashSet::new(),
885            Duration::from_secs(30),
886        )
887        .unwrap();
888
889        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
890
891        let amount_in = BigUint::from_str("20_000_000_000").unwrap(); // 20k USDC
892        let params = GetAmountOutParams {
893            amount_in: amount_in.clone(),
894            token_in: token_in.clone(),
895            token_out: token_out.clone(),
896            sender: router.clone(),
897            receiver: router,
898        };
899        let quote = client
900            .request_binding_quote(&params)
901            .await
902            .unwrap();
903
904        assert_eq!(quote.base_token, token_in);
905        assert_eq!(quote.quote_token, token_out);
906        assert_eq!(quote.amount_in, amount_in);
907
908        // Assuming the USDC - ONDO price doesn't change too much at the time of running this
909        assert!(quote.amount_out > BigUint::from_str("18000000000000000000000").unwrap()); // ~19k ONDO
910
911        // SWAP_AGGREGATE_SELECTOR = 0xa2f74893;
912        assert_eq!(
913            quote
914                .quote_attributes
915                .get("calldata")
916                .unwrap()[..4],
917            Bytes::from_str("0xa2f74893")
918                .unwrap()
919                .to_vec()
920        );
921        let partial_fill_offset_slice = quote
922            .quote_attributes
923            .get("partial_fill_offset")
924            .unwrap()
925            .as_ref();
926        let mut partial_fill_offset_array = [0u8; 8];
927        partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
928
929        // This is the only attribute that is significantly different for the Single and Aggregate
930        // Order
931        assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 2);
932    }
933
934    #[test]
935    fn test_process_bebop_quote_response_aggregate_order() {
936        let json =
937            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
938                .unwrap();
939        let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
940        let params = GetAmountOutParams {
941            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
942            token_in: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
943            token_out: Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap(),
944            sender: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
945            receiver: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
946        };
947        let res = BebopClient::process_quote_response(quote_response, &params).unwrap();
948        assert_eq!(res.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
949        assert_eq!(res.amount_in, BigUint::from_str("20000000000").unwrap());
950        assert_eq!(res.base_token, params.token_in);
951        assert_eq!(res.quote_token, params.token_out);
952    }
953
954    #[test]
955    fn test_process_bebop_quote_response_aggregate_order_with_multihop() {
956        let json = std::fs::read_to_string(
957            "src/rfq/protocols/bebop/test_responses/aggregate_order_with_multihop.json",
958        )
959        .unwrap();
960        let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
961        let params = GetAmountOutParams {
962            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
963            token_in: Bytes::from_str("0xDEf1CA1fb7FBcDC777520aa7f396b4E015F497aB").unwrap(),
964            token_out: Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(),
965            sender: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
966            receiver: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
967        };
968        let res = BebopClient::process_quote_response(quote_response, &params).unwrap();
969        assert_eq!(res.amount_out, BigUint::from_str("11186653890").unwrap());
970        assert_eq!(res.amount_in, BigUint::from_str("43067495979235520920162").unwrap());
971        assert_eq!(res.base_token, params.token_in);
972        assert_eq!(res.quote_token, params.token_out);
973    }
974
975    /// Helper function to create a mock server that responds after a delay
976    async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
977        use tokio::io::AsyncWriteExt;
978
979        let listener = TcpListener::bind("127.0.0.1:0")
980            .await
981            .unwrap();
982        let addr = listener.local_addr().unwrap();
983
984        let json_response =
985            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
986                .unwrap();
987
988        tokio::spawn(async move {
989            while let Ok((mut stream, _)) = listener.accept().await {
990                let json_response_clone = json_response.clone();
991                tokio::spawn(async move {
992                    sleep(Duration::from_millis(delay_ms)).await;
993
994                    let response = format!(
995                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
996                        json_response_clone.len(),
997                        json_response_clone
998                    );
999                    let _ = stream
1000                        .write_all(response.as_bytes())
1001                        .await;
1002                    let _ = stream.flush().await;
1003                    let _ = stream.shutdown().await;
1004                });
1005            }
1006        });
1007
1008        addr
1009    }
1010
1011    fn create_test_bebop_client(quote_endpoint: String, quote_timeout: Duration) -> BebopClient {
1012        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1013        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1014
1015        BebopClient {
1016            chain: Chain::Ethereum,
1017            price_ws: "ws://example.com".to_string(),
1018            quote_endpoint,
1019            tokens: HashSet::from([token_in, token_out]),
1020            tvl: 10.0,
1021            ws_user: "test_user".to_string(),
1022            ws_key: "test_key".to_string(),
1023            quote_tokens: HashSet::new(),
1024            quote_timeout,
1025        }
1026    }
1027
1028    /// Helper function to create test quote params matching aggregate_order.json
1029    fn create_test_quote_params() -> GetAmountOutParams {
1030        let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1031        let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
1032        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
1033
1034        GetAmountOutParams {
1035            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
1036            token_in,
1037            token_out,
1038            sender: router.clone(),
1039            receiver: router,
1040        }
1041    }
1042
1043    #[tokio::test]
1044    async fn test_bebop_quote_timeout() {
1045        let addr = create_delayed_response_server(500).await;
1046
1047        // Test 1: Client with short timeout (200ms) - should timeout
1048        let client_short_timeout = create_test_bebop_client(
1049            format!("http://127.0.0.1:{}/quote", addr.port()),
1050            Duration::from_millis(200),
1051        );
1052        let params = create_test_quote_params();
1053
1054        let start = std::time::Instant::now();
1055        let result = client_short_timeout
1056            .request_binding_quote(&params)
1057            .await;
1058        let elapsed = start.elapsed();
1059
1060        assert!(result.is_err());
1061        let err = result.unwrap_err();
1062        match err {
1063            RFQError::ConnectionError(msg) => {
1064                assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
1065            }
1066            _ => panic!("Expected ConnectionError, got: {:?}", err),
1067        }
1068        assert!(
1069            elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
1070            "Expected timeout around 200ms, got: {:?}",
1071            elapsed
1072        );
1073
1074        // Test 2: Client with long timeout (1 seconds) - should wait and receive response
1075        // Note: With retry logic, we may need multiple attempts if the response is malformed,
1076        // so we need a longer timeout to account for retries
1077        let client_long_timeout = create_test_bebop_client(
1078            format!("http://127.0.0.1:{}/quote", addr.port()),
1079            Duration::from_secs(1),
1080        );
1081
1082        let result = client_long_timeout
1083            .request_binding_quote(&params)
1084            .await;
1085
1086        // Should succeed - the server waits 500ms which is within the 1s timeout
1087        assert!(result.is_ok(), "Expected success, got: {:?}", result);
1088        let quote = result.unwrap();
1089
1090        // Verify the quote matches what we expect from aggregate_order.json
1091        assert_eq!(quote.base_token, params.token_in);
1092        assert_eq!(quote.quote_token, params.token_out);
1093    }
1094
1095    /// Helper function to create a mock server that fails twice, then succeeds with
1096    /// aggregate_order.json
1097    async fn create_retry_server() -> (std::net::SocketAddr, Arc<Mutex<u32>>) {
1098        use std::sync::{Arc, Mutex};
1099
1100        use tokio::io::AsyncWriteExt;
1101
1102        let request_count = Arc::new(Mutex::new(0u32));
1103        let request_count_clone = request_count.clone();
1104
1105        let listener = TcpListener::bind("127.0.0.1:0")
1106            .await
1107            .unwrap();
1108        let addr = listener.local_addr().unwrap();
1109
1110        let json_response =
1111            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
1112                .unwrap();
1113
1114        tokio::spawn(async move {
1115            while let Ok((mut stream, _)) = listener.accept().await {
1116                let count_clone = request_count_clone.clone();
1117                let json_response_clone = json_response.clone();
1118                tokio::spawn(async move {
1119                    *count_clone.lock().unwrap() += 1;
1120                    let count = *count_clone.lock().unwrap();
1121                    println!("Mock server: Received request #{count}");
1122
1123                    if count <= 2 {
1124                        let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1125                        let _ = stream
1126                            .write_all(response.as_bytes())
1127                            .await;
1128                    } else {
1129                        let response = format!(
1130                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1131                            json_response_clone.len(),
1132                            json_response_clone
1133                        );
1134                        let _ = stream
1135                            .write_all(response.as_bytes())
1136                            .await;
1137                    }
1138                    let _ = stream.flush().await;
1139                    let _ = stream.shutdown().await;
1140                });
1141            }
1142        });
1143        (addr, request_count)
1144    }
1145
1146    #[tokio::test]
1147    async fn test_bebop_quote_retry_on_bad_response() {
1148        let (addr, request_count) = create_retry_server().await;
1149
1150        let client = create_test_bebop_client(
1151            format!("http://127.0.0.1:{}/quote", addr.port()),
1152            Duration::from_secs(5),
1153        );
1154        let params = create_test_quote_params();
1155        let result = client
1156            .request_binding_quote(&params)
1157            .await;
1158
1159        assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1160        let quote = result.unwrap();
1161
1162        // Verify the quote (amounts from aggregate_order.json)
1163        assert_eq!(quote.amount_in, BigUint::from_str("20000000000").unwrap());
1164        assert_eq!(quote.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
1165
1166        // Verify exactly 3 requests were made (2 failures + 1 success)
1167        let final_count = *request_count.lock().unwrap();
1168        assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1169    }
1170
1171    #[test]
1172    fn test_bebop_client_serialize_deserialize_roundtrip() {
1173        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1174        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1175        let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1176
1177        let original = BebopClient {
1178            chain: Chain::Ethereum,
1179            price_ws: "wss://api.bebop.xyz/pricing".to_string(),
1180            quote_endpoint: "https://api.bebop.xyz/quote".to_string(),
1181            tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1182            tvl: 50.5,
1183            ws_user: "secret_user".to_string(),
1184            ws_key: "secret_key".to_string(),
1185            quote_tokens: HashSet::from([quote_token.clone()]),
1186            quote_timeout: Duration::from_millis(5500),
1187        };
1188
1189        let serialized = serde_json::to_string(&original).unwrap();
1190        let deserialized: BebopClient = serde_json::from_str(&serialized).unwrap();
1191
1192        // Fields that should round-trip correctly
1193        assert_eq!(deserialized.chain, original.chain);
1194        assert_eq!(deserialized.price_ws, original.price_ws);
1195        assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1196        assert_eq!(deserialized.tokens, original.tokens);
1197        assert_eq!(deserialized.tvl, original.tvl);
1198        assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1199        assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1200
1201        // ws_user and ws_key should NOT round-trip (skip_serializing + default)
1202        assert_eq!(deserialized.ws_user, "");
1203        assert_eq!(deserialized.ws_key, "");
1204        assert_ne!(deserialized.ws_user, original.ws_user);
1205        assert_ne!(deserialized.ws_key, original.ws_key);
1206    }
1207
1208    #[test]
1209    fn test_bebop_client_deserialize_with_credentials() {
1210        // When ws_user and ws_key are provided in JSON, they should be deserialized
1211        // (skip_serializing only affects serialization, not deserialization)
1212        let json = r#"{
1213            "chain": "ethereum",
1214            "price_ws": "wss://api.bebop.xyz/pricing",
1215            "quote_endpoint": "https://api.bebop.xyz/quote",
1216            "tokens": [],
1217            "tvl": 10.0,
1218            "ws_user": "provided_user",
1219            "ws_key": "provided_key",
1220            "quote_tokens": [],
1221            "quote_timeout": {"secs": 30, "nanos": 0}
1222        }"#;
1223
1224        let client: BebopClient = serde_json::from_str(json).unwrap();
1225
1226        // Credentials should be deserialized from JSON
1227        assert_eq!(client.ws_user, "provided_user");
1228        assert_eq!(client.ws_key, "provided_key");
1229    }
1230}