Skip to main content

tycho_simulation/rfq/protocols/bebop/
client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    time::SystemTime,
5};
6
7use alloy::primitives::{utils::keccak256, Address};
8use async_trait::async_trait;
9use futures::{stream::BoxStream, StreamExt};
10use http::Request;
11use num_bigint::BigUint;
12use prost::Message as ProstMessage;
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use tokio::time::{sleep, timeout, Duration};
16use tokio_tungstenite::{
17    connect_async_with_config,
18    tungstenite::{handshake::client::generate_key, Message},
19};
20use tracing::{error, info, warn};
21use tycho_common::{
22    models::{protocol::GetAmountOutParams, Chain},
23    simulation::indicatively_priced::SignedQuote,
24    Bytes,
25};
26
27use crate::{
28    rfq::{
29        client::RFQClient,
30        errors::RFQError,
31        models::TimestampHeader,
32        protocols::bebop::models::{
33            BebopOrderToSign, BebopPriceData, BebopPricingUpdate, BebopQuoteResponse,
34        },
35    },
36    tycho_client::feed::synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
37    tycho_common::dto::{ProtocolComponent, ResponseProtocolState},
38};
39
40fn bytes_to_address(address: &Bytes) -> Result<Address, RFQError> {
41    if address.len() == 20 {
42        Ok(Address::from_slice(address))
43    } else {
44        Err(RFQError::InvalidInput(format!("Invalid ERC20 token address: {address:?}")))
45    }
46}
47
48/// Maps a Chain to its corresponding Bebop WebSocket URL
49fn chain_to_bebop_url(chain: Chain) -> Result<String, RFQError> {
50    let chain_path = match chain {
51        Chain::Ethereum => "ethereum",
52        Chain::Base => "base",
53        _ => return Err(RFQError::FatalError(format!("Unsupported chain: {chain:?}"))),
54    };
55    let url = format!("api.bebop.xyz/pmm/{chain_path}/v3");
56    Ok(url)
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize)]
60pub struct BebopClient {
61    chain: Chain,
62    price_ws: String,
63    quote_endpoint: String,
64    // Tokens that we want prices for
65    tokens: HashSet<Bytes>,
66    // Min tvl value in the quote token.
67    tvl: f64,
68    // name header for authentication
69    #[serde(skip_serializing, default)]
70    ws_user: String,
71    // key header for authentication
72    #[serde(skip_serializing, default)]
73    ws_key: String,
74    // quote tokens to normalize to for TVL purposes. Should have the same prices.
75    quote_tokens: HashSet<Bytes>,
76    quote_timeout: Duration,
77}
78
79impl BebopClient {
80    pub const PROTOCOL_SYSTEM: &'static str = "rfq:bebop";
81
82    pub fn new(
83        chain: Chain,
84        tokens: HashSet<Bytes>,
85        tvl: f64,
86        ws_user: String,
87        ws_key: String,
88        quote_tokens: HashSet<Bytes>,
89        quote_timeout: Duration,
90    ) -> Result<Self, RFQError> {
91        let url = chain_to_bebop_url(chain)?;
92        Ok(Self {
93            price_ws: "wss://".to_string() + &url + "/pricing?format=protobuf",
94            quote_endpoint: "https://".to_string() + &url + "/quote",
95            tokens,
96            chain,
97            tvl,
98            ws_user,
99            ws_key,
100            quote_tokens,
101            quote_timeout,
102        })
103    }
104
105    fn create_component_with_state(
106        &self,
107        component_id: String,
108        tokens: Vec<tycho_common::Bytes>,
109        price_data: &BebopPriceData,
110        tvl: f64,
111    ) -> ComponentWithState {
112        let protocol_component = ProtocolComponent {
113            id: component_id.clone(),
114            protocol_system: Self::PROTOCOL_SYSTEM.to_string(),
115            protocol_type_name: "bebop_pool".to_string(),
116            chain: self.chain.into(),
117            tokens,
118            contract_ids: vec![], // empty for RFQ
119            static_attributes: Default::default(),
120            change: Default::default(),
121            creation_tx: Default::default(),
122            created_at: Default::default(),
123        };
124
125        let mut attributes = HashMap::new();
126
127        // Store all bids and asks as JSON strings, since we cannot store arrays
128        // Convert flat arrays [price1, size1, price2, size2, ...] to pairs [(price1, size1),
129        // (price2, size2), ...]
130        if !price_data.bids.is_empty() {
131            let bids_pairs: Vec<(f32, f32)> = price_data
132                .bids
133                .chunks_exact(2)
134                .map(|chunk| (chunk[0], chunk[1]))
135                .collect();
136            let bids_json = serde_json::to_string(&bids_pairs).unwrap_or_default();
137            attributes.insert("bids".to_string(), bids_json.as_bytes().to_vec().into());
138        }
139        if !price_data.asks.is_empty() {
140            let asks_pairs: Vec<(f32, f32)> = price_data
141                .asks
142                .chunks_exact(2)
143                .map(|chunk| (chunk[0], chunk[1]))
144                .collect();
145            let asks_json = serde_json::to_string(&asks_pairs).unwrap_or_default();
146            attributes.insert("asks".to_string(), asks_json.as_bytes().to_vec().into());
147        }
148
149        ComponentWithState {
150            state: ResponseProtocolState {
151                component_id: component_id.clone(),
152                attributes,
153                balances: HashMap::new(),
154            },
155            component: protocol_component,
156            component_tvl: Some(tvl),
157            entrypoints: vec![],
158        }
159    }
160
161    fn process_quote_response(
162        quote_response: BebopQuoteResponse,
163        params: &GetAmountOutParams,
164    ) -> Result<SignedQuote, RFQError> {
165        match quote_response {
166            BebopQuoteResponse::Success(quote) => {
167                quote.validate(params)?;
168
169                let mut quote_attributes: HashMap<String, Bytes> = HashMap::new();
170                quote_attributes.insert("calldata".into(), quote.tx.data);
171                quote_attributes.insert(
172                    "partial_fill_offset".into(),
173                    Bytes::from(
174                        quote
175                            .partial_fill_offset
176                            .to_be_bytes()
177                            .to_vec(),
178                    ),
179                );
180                let signed_quote = match quote.to_sign {
181                    BebopOrderToSign::Single(ref single) => SignedQuote {
182                        base_token: params.token_in.clone(),
183                        quote_token: params.token_out.clone(),
184                        amount_in: BigUint::from_str(&single.taker_amount).map_err(|_| {
185                            RFQError::ParsingError(format!(
186                                "Failed to parse amount in string: {}",
187                                single.taker_amount
188                            ))
189                        })?,
190                        amount_out: BigUint::from_str(&single.maker_amount).map_err(|_| {
191                            RFQError::ParsingError(format!(
192                                "Failed to parse amount out string: {}",
193                                single.maker_amount
194                            ))
195                        })?,
196                        quote_attributes,
197                    },
198                    BebopOrderToSign::Aggregate(aggregate) => {
199                        // Sum taker_amounts for taker_tokens matching the token_in
200                        let amount_in: BigUint = aggregate
201                            .taker_tokens
202                            .iter()
203                            .zip(&aggregate.taker_amounts)
204                            .flat_map(|(tokens, amounts)| {
205                                tokens
206                                    .iter()
207                                    .zip(amounts)
208                                    .filter_map(|(token, amount)| {
209                                        if token == &params.token_in {
210                                            BigUint::from_str(amount).ok()
211                                        } else {
212                                            None
213                                        }
214                                    })
215                            })
216                            .sum();
217
218                        // Sum maker_amounts for maker_tokens matching the token_out
219                        let amount_out: BigUint = aggregate
220                            .maker_tokens
221                            .iter()
222                            .zip(&aggregate.maker_amounts)
223                            .flat_map(|(tokens, amounts)| {
224                                tokens
225                                    .iter()
226                                    .zip(amounts)
227                                    .filter_map(|(token, amount)| {
228                                        if token == &params.token_out {
229                                            BigUint::from_str(amount).ok()
230                                        } else {
231                                            None
232                                        }
233                                    })
234                            })
235                            .sum();
236
237                        SignedQuote {
238                            base_token: params.token_in.clone(),
239                            quote_token: params.token_out.clone(),
240                            amount_in,
241                            amount_out,
242                            quote_attributes,
243                        }
244                    }
245                };
246
247                Ok(signed_quote)
248            }
249            BebopQuoteResponse::Error(err) => Err(RFQError::FatalError(format!(
250                "Bebop API error: code {} - {} (requestId: {})",
251                err.error.error_code, err.error.message, err.error.request_id
252            ))),
253        }
254    }
255}
256
257#[async_trait]
258impl RFQClient for BebopClient {
259    fn stream(
260        &self,
261    ) -> BoxStream<'static, Result<(String, StateSyncMessage<TimestampHeader>), RFQError>> {
262        let tokens = self.tokens.clone();
263        let url = self.price_ws.clone();
264        let tvl_threshold = self.tvl;
265        let name = self.ws_user.clone();
266        let authorization = self.ws_key.clone();
267        let client = self.clone();
268
269        Box::pin(async_stream::stream! {
270            let mut current_components: HashMap<String, ComponentWithState> = HashMap::new();
271            let mut reconnect_attempts = 0;
272            const MAX_RECONNECT_ATTEMPTS: u32 = 10;
273
274            loop {
275                let request = Request::builder()
276                    .method("GET")
277                    .uri(&url)
278                    .header("Host", "api.bebop.xyz")
279                    .header("Upgrade", "websocket")
280                    .header("Connection", "Upgrade")
281                    .header("Sec-WebSocket-Key", generate_key())
282                    .header("Sec-WebSocket-Version", "13")
283                    .header("name", &name)
284                    .header("Authorization", &authorization)
285                    .body(())
286                    .map_err(|_| RFQError::FatalError("Failed to build request".into()))?;
287
288                // Connect to Bebop WebSocket with custom headers
289                let (ws_stream, _) = match connect_async_with_config(request, None, false).await {
290                    Ok(connection) => {
291                        info!("Successfully connected to Bebop WebSocket");
292                        reconnect_attempts = 0; // Reset counter on successful connection
293                        connection
294                    },
295                    Err(e) => {
296                        reconnect_attempts += 1;
297                        error!("Failed to connect to Bebop WebSocket (attempt {}): {}", reconnect_attempts, e);
298
299                        if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
300                            yield Err(RFQError::ConnectionError(format!("Failed to connect after {MAX_RECONNECT_ATTEMPTS} attempts: {e}")));
301                            return;
302                        }
303
304                        let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
305                        info!("Retrying connection in {} seconds...", backoff_duration.as_secs());
306                        sleep(backoff_duration).await;
307                        continue;
308                    }
309                };
310
311                let (_, mut ws_receiver) = ws_stream.split();
312
313                // Message processing loop
314                while let Some(msg) = ws_receiver.next().await {
315                    match msg {
316                        Ok(Message::Binary(data)) => {
317                            match BebopPricingUpdate::decode(&data[..]) {
318                                Ok(protobuf_update) => {
319                                    let mut new_components = HashMap::new();
320
321                                    // Process all pairs directly from protobuf
322                                    for price_data in &protobuf_update.pairs {
323                                        let base_bytes = Bytes::from(price_data.base.clone());
324                                        let quote_bytes = Bytes::from(price_data.quote.clone());
325                                        if tokens.contains(&base_bytes) && tokens.contains(&quote_bytes) {
326                                            let pair_tokens = vec![
327                                                base_bytes.clone(), quote_bytes.clone()
328                                            ];
329
330                                            let mut quote_price_data: Option<&BebopPriceData> = None;
331                                            // The quote token is not one of the approved quote tokens
332                                            // Get the price, so we can normalize our TVL calculation
333                                            if !client.quote_tokens.contains(&quote_bytes) {
334                                                for approved_quote_token in &client.quote_tokens {
335                                                    // Look for a pair containing both our quote token and an approved token
336                                                    // Can be either QUOTE/APPROVED or APPROVED/QUOTE
337                                                    if let Some(quote_data) = protobuf_update.pairs.iter()
338                                                        .find(|p| {
339                                                            (p.base == quote_bytes.as_ref() && p.quote == approved_quote_token.as_ref()) ||
340                                                            (p.quote == quote_bytes.as_ref() && p.base == approved_quote_token.as_ref())
341                                                        }) {
342                                                        quote_price_data = Some(quote_data);
343                                                        break;
344                                                    }
345                                                }
346
347                                                // Quote token doesn't have price levels in approved quote tokens.
348                                                // Skip.
349                                                if quote_price_data.is_none() {
350                                                    warn!("Quote token {} does not have price levels in approved quote token. Skipping.", hex::encode(&quote_bytes));
351                                                    continue;
352                                                }
353                                            }
354
355                                            let tvl = price_data.calculate_tvl(quote_price_data);
356                                            if tvl < tvl_threshold {
357                                                continue;
358                                            }
359
360                                            let pair_str = format!("bebop_{}/{}", hex::encode(&base_bytes), hex::encode(&quote_bytes));
361                                            let component_id = format!("{}", keccak256(pair_str.as_bytes()));
362                                            let component_with_state = client.create_component_with_state(
363                                                component_id.clone(),
364                                                pair_tokens,
365                                                price_data,
366                                                tvl
367                                            );
368                                            new_components.insert(component_id, component_with_state);
369                                        }
370                                    }
371
372                                    // Find components that were removed (existed before but not in this update)
373                                    // This includes components with no bids or asks, since they are filtered
374                                    // out by the tvl threshold.
375                                    let removed_components: HashMap<String, ProtocolComponent> = current_components
376                                        .iter()
377                                        .filter(|&(id, _)| !new_components.contains_key(id))
378                                        .map(|(k, v)| (k.clone(), v.component.clone()))
379                                        .collect();
380
381                                    // Update our current state
382                                    current_components = new_components.clone();
383
384                                    let snapshot = Snapshot {
385                                        states: new_components,
386                                        vm_storage: HashMap::new(),
387                                    };
388                                    let timestamp = SystemTime::now().duration_since(
389                                        SystemTime::UNIX_EPOCH
390                                    ).map_err(
391                                        |_| RFQError::ParsingError("SystemTime before UNIX EPOCH!".into())
392                                    )?.as_secs();
393
394                                    let msg = StateSyncMessage::<TimestampHeader> {
395                                        header: TimestampHeader { timestamp },
396                                        snapshots: snapshot,
397                                        deltas: None, // Deltas are always None - all the changes are absolute
398                                        removed_components,
399                                    };
400
401                                    // Yield one message containing all updated pairs
402                                    yield Ok(("bebop".to_string(), msg));
403                                },
404                                Err(e) => {
405                                    error!("Failed to parse protobuf message: {}", e);
406                                    break;
407                                }
408                            }
409                        }
410                        Ok(Message::Close(_)) => {
411                            info!("WebSocket connection closed by server");
412                            break;
413                        }
414                        Err(e) => {
415                            error!("WebSocket error: {}", e);
416                            break;
417                        }
418                        _ => {} // Ignore other message types
419                    }
420                }
421
422                // If we're here, the message loop exited - always attempt to reconnect
423                reconnect_attempts += 1;
424                if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS {
425                    yield Err(RFQError::ConnectionError(format!("Connection failed after {MAX_RECONNECT_ATTEMPTS} attempts")));
426                    return;
427                }
428
429                let backoff_duration = Duration::from_secs(2_u64.pow(reconnect_attempts.min(5)));
430                info!("Reconnecting in {} seconds (attempt {})...", backoff_duration.as_secs(), reconnect_attempts);
431                sleep(backoff_duration).await;
432                // Continue to the next iteration of the main loop
433            }
434        })
435    }
436
437    async fn request_binding_quote(
438        &self,
439        params: &GetAmountOutParams,
440    ) -> Result<SignedQuote, RFQError> {
441        let sell_token = bytes_to_address(&params.token_in)?.to_string();
442        let buy_token = bytes_to_address(&params.token_out)?.to_string();
443        let sell_amount = params.amount_in.to_string();
444        let sender = bytes_to_address(&params.sender)?.to_string();
445        let receiver = bytes_to_address(&params.receiver)?.to_string();
446
447        let url = self.quote_endpoint.clone();
448
449        let client = Client::new();
450
451        let start_time = std::time::Instant::now();
452        const MAX_RETRIES: u32 = 3;
453        let mut last_error = None;
454
455        for attempt in 0..MAX_RETRIES {
456            // Check if we have time remaining for this attempt
457            let elapsed = start_time.elapsed();
458            if elapsed >= self.quote_timeout {
459                return Err(last_error.unwrap_or_else(|| {
460                    RFQError::ConnectionError(format!(
461                        "Bebop quote request timed out after {} seconds",
462                        self.quote_timeout.as_secs()
463                    ))
464                }));
465            }
466
467            let remaining_time = self.quote_timeout - elapsed;
468
469            let request = client
470                .get(&url)
471                .query(&[
472                    ("sell_tokens", sell_token.clone()),
473                    ("buy_tokens", buy_token.clone()),
474                    ("sell_amounts", sell_amount.clone()),
475                    ("taker_address", sender.clone()),
476                    ("receiver_address", receiver.clone()),
477                    ("approval_type", "Standard".into()),
478                    ("skip_validation", "true".into()),
479                    ("skip_taker_checks", "true".into()),
480                    ("gasless", "false".into()),
481                    ("expiry_type", "standard".into()),
482                    ("fee", "0".into()),
483                    ("is_ui", "false".into()),
484                    ("source", self.ws_user.clone()),
485                ])
486                .header("accept", "application/json")
487                .header("name", &self.ws_user)
488                .header("source-auth", &self.ws_key)
489                .header("Authorization", &self.ws_key);
490
491            let response = match timeout(remaining_time, request.send()).await {
492                Ok(Ok(resp)) => resp,
493                Ok(Err(e)) => {
494                    warn!(
495                        "Bebop quote request failed (attempt {}/{}): {}",
496                        attempt + 1,
497                        MAX_RETRIES,
498                        e
499                    );
500                    last_error = Some(RFQError::ConnectionError(format!(
501                        "Failed to send Bebop quote request: {e}"
502                    )));
503                    if attempt < MAX_RETRIES - 1 {
504                        continue;
505                    } else {
506                        return Err(last_error.unwrap());
507                    }
508                }
509                Err(_) => {
510                    return Err(RFQError::ConnectionError(format!(
511                        "Bebop quote request timed out after {} seconds",
512                        self.quote_timeout.as_secs()
513                    )));
514                }
515            };
516
517            let quote_response = match response
518                .json::<BebopQuoteResponse>()
519                .await
520            {
521                Ok(resp) => resp,
522                Err(e) => {
523                    warn!(
524                        "Bebop quote response parsing failed (attempt {}/{}): {}",
525                        attempt + 1,
526                        MAX_RETRIES,
527                        e
528                    );
529                    last_error = Some(RFQError::ParsingError(format!(
530                        "Failed to parse Bebop quote response: {e}"
531                    )));
532                    if attempt < MAX_RETRIES - 1 {
533                        sleep(Duration::from_millis(100)).await;
534                        continue;
535                    } else {
536                        return Err(last_error.unwrap());
537                    }
538                }
539            };
540
541            return Self::process_quote_response(quote_response, params);
542        }
543
544        Err(last_error.unwrap_or_else(|| {
545            RFQError::ConnectionError("Bebop quote request failed after retries".to_string())
546        }))
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use std::{
553        sync::{Arc, Mutex},
554        time::Duration,
555    };
556
557    use dotenv::dotenv;
558    use futures::SinkExt;
559    use tokio::{net::TcpListener, time::timeout};
560    use tokio_tungstenite::accept_async;
561
562    use super::*;
563    use crate::rfq::constants::get_bebop_auth;
564
565    #[tokio::test]
566    #[ignore] // Requires network access and setting proper env vars
567    async fn test_bebop_websocket_connection() {
568        // We test with quote tokens that are not USDC in order to ensure our normalization works
569        // fine
570        let wbtc = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
571        let weth = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
572
573        dotenv().expect("Missing .env file");
574        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
575
576        let quote_tokens = HashSet::from([
577            // Use addresses we forgot to checksum (to test checksumming)
578            Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(), // USDC
579            Bytes::from_str("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), // USDT
580        ]);
581
582        let client = BebopClient::new(
583            Chain::Ethereum,
584            HashSet::from_iter(vec![weth.clone(), wbtc.clone()]),
585            10.0, // $10 minimum TVL
586            auth.user,
587            auth.key,
588            quote_tokens,
589            Duration::from_secs(30),
590        )
591        .unwrap();
592
593        let mut stream = client.stream();
594
595        // Test connection and message reception with timeout
596        let result = timeout(Duration::from_secs(10), async {
597            let mut message_count = 0;
598            let max_messages = 5;
599
600            while let Some(result) = stream.next().await {
601                match result {
602                    Ok((component_id, msg)) => {
603                        println!("Received message with ID: {component_id}");
604
605                        assert!(!component_id.is_empty());
606                        assert_eq!(component_id, "bebop");
607                        assert!(msg.header.timestamp > 0);
608                        assert!(!msg.snapshots.states.is_empty());
609
610                        let snapshot = &msg.snapshots;
611
612                        // We got at least one component
613                        assert!(!snapshot.states.is_empty());
614
615                        println!("Received {} components in this message", snapshot.states.len());
616                        for (id, component_with_state) in &snapshot.states {
617                            assert_eq!(
618                                component_with_state
619                                    .component
620                                    .protocol_system,
621                                "rfq:bebop"
622                            );
623                            assert_eq!(
624                                component_with_state
625                                    .component
626                                    .protocol_type_name,
627                                "bebop_pool"
628                            );
629                            assert_eq!(
630                                component_with_state.component.chain,
631                                Chain::Ethereum.into()
632                            );
633
634                            let attributes = &component_with_state.state.attributes;
635
636                            // Check that bids and asks exist and have non-empty byte strings
637                            assert!(attributes.contains_key("bids"));
638                            assert!(attributes.contains_key("asks"));
639                            assert!(!attributes["bids"].is_empty());
640                            assert!(!attributes["asks"].is_empty());
641
642                            if let Some(tvl) = component_with_state.component_tvl {
643                                assert!(tvl >= 0.0);
644                                println!("Component {id} TVL: ${tvl:.2}");
645                            }
646                        }
647
648                        message_count += 1;
649                        if message_count >= max_messages {
650                            break;
651                        }
652                    }
653                    Err(e) => {
654                        panic!("Stream error: {e}");
655                    }
656                }
657            }
658
659            assert!(message_count > 0, "Should have received at least one message");
660            println!("Successfully received {message_count} messages");
661        })
662        .await;
663
664        match result {
665            Ok(_) => println!("Test completed successfully"),
666            Err(_) => panic!("Test timed out - no messages received within 10 seconds"),
667        }
668    }
669
670    #[tokio::test]
671    async fn test_websocket_reconnection() {
672        // Start a mock WebSocket server that will drop connections intermittently
673        let listener = TcpListener::bind("127.0.0.1:0")
674            .await
675            .unwrap();
676        let addr = listener.local_addr().unwrap();
677
678        // Creates a thread-safe counter.
679        let connection_count = Arc::new(Mutex::new(0u32));
680
681        // We must clone - since we want to read the original value at the end of the test.
682        let connection_count_clone = connection_count.clone();
683
684        tokio::spawn(async move {
685            while let Ok((stream, _)) = listener.accept().await {
686                *connection_count_clone.lock().unwrap() += 1;
687                let count = *connection_count_clone.lock().unwrap();
688                println!("Mock server: Connection #{count} established");
689
690                tokio::spawn(async move {
691                    if let Ok(ws_stream) = accept_async(stream).await {
692                        let (mut ws_sender, _ws_receiver) = ws_stream.split();
693
694                        // Create test protobuf message
695                        let weth_addr =
696                            hex::decode("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
697                        let usdc_addr =
698                            hex::decode("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap();
699
700                        let test_price_data = BebopPriceData {
701                            base: weth_addr,
702                            quote: usdc_addr,
703                            last_update_ts: 1752617378,
704                            bids: vec![3070.05f32, 0.325717f32],
705                            asks: vec![3070.527f32, 0.325717f32],
706                        };
707
708                        let pricing_update = BebopPricingUpdate { pairs: vec![test_price_data] };
709
710                        let test_message = pricing_update.encode_to_vec();
711
712                        if count == 1 {
713                            // First connection: Send message successfully, then drop
714                            println!("Mock server: Connection #1 - sending message then dropping.");
715                            let _ = ws_sender
716                                .send(Message::Binary(test_message.clone().into()))
717                                .await;
718
719                            // Give time for message to be processed, then drop the connection.
720                            tokio::time::sleep(Duration::from_millis(100)).await;
721                            println!("Mock server: Dropping connection #1");
722                            let _ = ws_sender.close().await;
723                        } else if count == 2 {
724                            // Second connection: Send message successfully and maintain connection
725                            println!("Mock server: Connection #2 - maintaining stable connection.");
726                            let _ = ws_sender
727                                .send(Message::Binary(test_message.clone().into()))
728                                .await;
729                        }
730                    }
731                });
732            }
733        });
734
735        // Wait a moment for the server to start
736        tokio::time::sleep(Duration::from_millis(50)).await;
737
738        let mut test_quote_tokens = HashSet::new();
739        test_quote_tokens
740            .insert(Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap());
741
742        let tokens_formatted = vec![
743            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap(),
744            Bytes::from_str("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48").unwrap(),
745        ];
746
747        // Bypass the new() constructor to mock the URL to point to our mock server.
748        let client = BebopClient {
749            chain: Chain::Ethereum,
750            price_ws: format!("ws://127.0.0.1:{}", addr.port()),
751            tokens: tokens_formatted.into_iter().collect(),
752            tvl: 1000.0,
753            ws_user: "test_user".to_string(),
754            ws_key: "test_key".to_string(),
755            quote_tokens: test_quote_tokens,
756            quote_endpoint: "".to_string(),
757            quote_timeout: Duration::from_secs(5),
758        };
759
760        let start_time = std::time::Instant::now();
761        let mut successful_messages = 0;
762        let mut connection_errors = 0;
763        let mut first_message_received = false;
764        let mut second_message_received = false;
765
766        // Expected flow:
767        // 1. Receive first message successfully
768        // 2. Connection drops
769        // 3. Client reconnects
770        // 4. Receive second message successfully
771        // Timeout if two messages are not received within 5 seconds.
772        while start_time.elapsed() < Duration::from_secs(5) && successful_messages < 2 {
773            match timeout(Duration::from_millis(1000), client.stream().next()).await {
774                Ok(Some(result)) => match result {
775                    Ok((_component_id, _message)) => {
776                        successful_messages += 1;
777                        println!("Received successful message {successful_messages}");
778
779                        if successful_messages == 1 {
780                            first_message_received = true;
781                            println!("First message received - connection should drop after this.");
782                        } else if successful_messages == 2 {
783                            second_message_received = true;
784                            println!("Second message received after reconnection.");
785                        }
786                    }
787                    Err(e) => {
788                        connection_errors += 1;
789                        println!("Connection error during reconnection: {e:?}");
790                    }
791                },
792                Ok(None) => {
793                    panic!("Stream ended unexpectedly");
794                }
795                Err(_) => {
796                    println!("Timeout waiting for message (normal during reconnections)");
797                    continue;
798                }
799            }
800        }
801
802        let final_connection_count = *connection_count.lock().unwrap();
803
804        // 1. Exactly 2 connection attempts (initial + reconnect)
805        // 2. Exactly 2 successful messages (one before drop, one after reconnect)
806
807        assert_eq!(final_connection_count, 2);
808        assert!(first_message_received);
809        assert!(second_message_received);
810        assert_eq!(connection_errors, 0);
811        assert_eq!(successful_messages, 2);
812    }
813
814    #[tokio::test]
815    #[ignore] // Requires network access and setting proper env vars
816    async fn test_bebop_quote_single_order() {
817        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
818        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
819        dotenv().expect("Missing .env file");
820        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
821
822        let client = BebopClient::new(
823            Chain::Ethereum,
824            HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
825            10.0, // $10 minimum TVL
826            auth.user,
827            auth.key,
828            HashSet::new(),
829            Duration::from_secs(30),
830        )
831        .unwrap();
832
833        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
834
835        let params = GetAmountOutParams {
836            amount_in: BigUint::from(1_000000000000000000u64),
837            token_in: token_in.clone(),
838            token_out: token_out.clone(),
839            sender: router.clone(),
840            receiver: router,
841        };
842        let quote = client
843            .request_binding_quote(&params)
844            .await
845            .unwrap();
846
847        assert_eq!(quote.base_token, token_in);
848        assert_eq!(quote.quote_token, token_out);
849        assert_eq!(quote.amount_in, BigUint::from(1_000000000000000000u64));
850
851        // Assuming the BTC - WETH price doesn't change too much at the time of running this
852        assert!(quote.amount_out > BigUint::from(3000000u64));
853
854        // SWAP_SINGLE_SELECTOR = 0x4dcebcba;
855        assert_eq!(
856            quote
857                .quote_attributes
858                .get("calldata")
859                .unwrap()[..4],
860            Bytes::from_str("0x4dcebcba")
861                .unwrap()
862                .to_vec()
863        );
864        let partial_fill_offset_slice = quote
865            .quote_attributes
866            .get("partial_fill_offset")
867            .unwrap()
868            .as_ref();
869        let mut partial_fill_offset_array = [0u8; 8];
870        partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
871
872        assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 12);
873    }
874
875    #[tokio::test]
876    #[ignore] // Requires network access and setting proper env vars
877    async fn test_bebop_quote_aggregate_order() {
878        // This will make a quote request similar to the previous test but with a very big amount
879        // We expect the Bebop Quote to have an aggregate order (split between different mms)
880        let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
881        let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
882        dotenv().expect("Missing .env file");
883        let auth = get_bebop_auth().expect("Failed to get Bebop authentication");
884
885        let client = BebopClient::new(
886            Chain::Ethereum,
887            HashSet::from_iter(vec![token_in.clone(), token_out.clone()]),
888            10.0, // $10 minimum TVL
889            auth.user,
890            auth.key,
891            HashSet::new(),
892            Duration::from_secs(30),
893        )
894        .unwrap();
895
896        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
897
898        let amount_in = BigUint::from_str("20_000_000_000").unwrap(); // 20k USDC
899        let params = GetAmountOutParams {
900            amount_in: amount_in.clone(),
901            token_in: token_in.clone(),
902            token_out: token_out.clone(),
903            sender: router.clone(),
904            receiver: router,
905        };
906        let quote = client
907            .request_binding_quote(&params)
908            .await
909            .unwrap();
910
911        assert_eq!(quote.base_token, token_in);
912        assert_eq!(quote.quote_token, token_out);
913        assert_eq!(quote.amount_in, amount_in);
914
915        // Assuming the USDC - ONDO price doesn't change too much at the time of running this
916        assert!(quote.amount_out > BigUint::from_str("18000000000000000000000").unwrap()); // ~19k ONDO
917
918        // SWAP_AGGREGATE_SELECTOR = 0xa2f74893;
919        assert_eq!(
920            quote
921                .quote_attributes
922                .get("calldata")
923                .unwrap()[..4],
924            Bytes::from_str("0xa2f74893")
925                .unwrap()
926                .to_vec()
927        );
928        let partial_fill_offset_slice = quote
929            .quote_attributes
930            .get("partial_fill_offset")
931            .unwrap()
932            .as_ref();
933        let mut partial_fill_offset_array = [0u8; 8];
934        partial_fill_offset_array.copy_from_slice(partial_fill_offset_slice);
935
936        // This is the only attribute that is significantly different for the Single and Aggregate
937        // Order
938        assert_eq!(u64::from_be_bytes(partial_fill_offset_array), 2);
939    }
940
941    #[test]
942    fn test_process_bebop_quote_response_aggregate_order() {
943        let json =
944            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
945                .unwrap();
946        let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
947        let params = GetAmountOutParams {
948            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
949            token_in: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
950            token_out: Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap(),
951            sender: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
952            receiver: Bytes::from_str("0xfd0b31d2e955fa55e3fa641fe90e08b677188d35").unwrap(),
953        };
954        let res = BebopClient::process_quote_response(quote_response, &params).unwrap();
955        assert_eq!(res.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
956        assert_eq!(res.amount_in, BigUint::from_str("20000000000").unwrap());
957        assert_eq!(res.base_token, params.token_in);
958        assert_eq!(res.quote_token, params.token_out);
959    }
960
961    #[test]
962    fn test_process_bebop_quote_response_aggregate_order_with_multihop() {
963        let json = std::fs::read_to_string(
964            "src/rfq/protocols/bebop/test_responses/aggregate_order_with_multihop.json",
965        )
966        .unwrap();
967        let quote_response: BebopQuoteResponse = serde_json::from_str(&json).unwrap();
968        let params = GetAmountOutParams {
969            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
970            token_in: Bytes::from_str("0xDEf1CA1fb7FBcDC777520aa7f396b4E015F497aB").unwrap(),
971            token_out: Bytes::from_str("0xdAC17F958D2ee523a2206206994597C13D831ec7").unwrap(),
972            sender: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
973            receiver: Bytes::from_str("0x809305d724B6E79C71e10a097ABadd1274B9C279").unwrap(),
974        };
975        let res = BebopClient::process_quote_response(quote_response, &params).unwrap();
976        assert_eq!(res.amount_out, BigUint::from_str("11186653890").unwrap());
977        assert_eq!(res.amount_in, BigUint::from_str("43067495979235520920162").unwrap());
978        assert_eq!(res.base_token, params.token_in);
979        assert_eq!(res.quote_token, params.token_out);
980    }
981
982    /// Helper function to create a mock server that responds after a delay
983    async fn create_delayed_response_server(delay_ms: u64) -> std::net::SocketAddr {
984        use tokio::io::AsyncWriteExt;
985
986        let listener = TcpListener::bind("127.0.0.1:0")
987            .await
988            .unwrap();
989        let addr = listener.local_addr().unwrap();
990
991        let json_response =
992            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
993                .unwrap();
994
995        tokio::spawn(async move {
996            while let Ok((mut stream, _)) = listener.accept().await {
997                let json_response_clone = json_response.clone();
998                tokio::spawn(async move {
999                    sleep(Duration::from_millis(delay_ms)).await;
1000
1001                    let response = format!(
1002                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1003                        json_response_clone.len(),
1004                        json_response_clone
1005                    );
1006                    let _ = stream
1007                        .write_all(response.as_bytes())
1008                        .await;
1009                    let _ = stream.flush().await;
1010                    let _ = stream.shutdown().await;
1011                });
1012            }
1013        });
1014
1015        addr
1016    }
1017
1018    fn create_test_bebop_client(quote_endpoint: String, quote_timeout: Duration) -> BebopClient {
1019        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1020        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1021
1022        BebopClient {
1023            chain: Chain::Ethereum,
1024            price_ws: "ws://example.com".to_string(),
1025            quote_endpoint,
1026            tokens: HashSet::from([token_in, token_out]),
1027            tvl: 10.0,
1028            ws_user: "test_user".to_string(),
1029            ws_key: "test_key".to_string(),
1030            quote_tokens: HashSet::new(),
1031            quote_timeout,
1032        }
1033    }
1034
1035    /// Helper function to create test quote params matching aggregate_order.json
1036    fn create_test_quote_params() -> GetAmountOutParams {
1037        let token_in = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1038        let token_out = Bytes::from_str("0xfAbA6f8e4a5E8Ab82F62fe7C39859FA577269BE3").unwrap();
1039        let router = Bytes::from_str("0xfD0b31d2E955fA55e3fa641Fe90e08b677188d35").unwrap();
1040
1041        GetAmountOutParams {
1042            amount_in: BigUint::from_str("43067495979235520920162").unwrap(),
1043            token_in,
1044            token_out,
1045            sender: router.clone(),
1046            receiver: router,
1047        }
1048    }
1049
1050    #[tokio::test]
1051    async fn test_bebop_quote_timeout() {
1052        let addr = create_delayed_response_server(500).await;
1053
1054        // Test 1: Client with short timeout (200ms) - should timeout
1055        let client_short_timeout = create_test_bebop_client(
1056            format!("http://127.0.0.1:{}/quote", addr.port()),
1057            Duration::from_millis(200),
1058        );
1059        let params = create_test_quote_params();
1060
1061        let start = std::time::Instant::now();
1062        let result = client_short_timeout
1063            .request_binding_quote(&params)
1064            .await;
1065        let elapsed = start.elapsed();
1066
1067        assert!(result.is_err());
1068        let err = result.unwrap_err();
1069        match err {
1070            RFQError::ConnectionError(msg) => {
1071                assert!(msg.contains("timed out"), "Expected timeout error, got: {}", msg);
1072            }
1073            _ => panic!("Expected ConnectionError, got: {:?}", err),
1074        }
1075        assert!(
1076            elapsed.as_millis() >= 200 && elapsed.as_millis() < 400,
1077            "Expected timeout around 200ms, got: {:?}",
1078            elapsed
1079        );
1080
1081        // Test 2: Client with long timeout (1 seconds) - should wait and receive response
1082        // Note: With retry logic, we may need multiple attempts if the response is malformed,
1083        // so we need a longer timeout to account for retries
1084        let client_long_timeout = create_test_bebop_client(
1085            format!("http://127.0.0.1:{}/quote", addr.port()),
1086            Duration::from_secs(1),
1087        );
1088
1089        let result = client_long_timeout
1090            .request_binding_quote(&params)
1091            .await;
1092
1093        // Should succeed - the server waits 500ms which is within the 1s timeout
1094        assert!(result.is_ok(), "Expected success, got: {:?}", result);
1095        let quote = result.unwrap();
1096
1097        // Verify the quote matches what we expect from aggregate_order.json
1098        assert_eq!(quote.base_token, params.token_in);
1099        assert_eq!(quote.quote_token, params.token_out);
1100    }
1101
1102    /// Helper function to create a mock server that fails twice, then succeeds with
1103    /// aggregate_order.json
1104    async fn create_retry_server() -> (std::net::SocketAddr, Arc<Mutex<u32>>) {
1105        use std::sync::{Arc, Mutex};
1106
1107        use tokio::io::AsyncWriteExt;
1108
1109        let request_count = Arc::new(Mutex::new(0u32));
1110        let request_count_clone = request_count.clone();
1111
1112        let listener = TcpListener::bind("127.0.0.1:0")
1113            .await
1114            .unwrap();
1115        let addr = listener.local_addr().unwrap();
1116
1117        let json_response =
1118            std::fs::read_to_string("src/rfq/protocols/bebop/test_responses/aggregate_order.json")
1119                .unwrap();
1120
1121        tokio::spawn(async move {
1122            while let Ok((mut stream, _)) = listener.accept().await {
1123                let count_clone = request_count_clone.clone();
1124                let json_response_clone = json_response.clone();
1125                tokio::spawn(async move {
1126                    *count_clone.lock().unwrap() += 1;
1127                    let count = *count_clone.lock().unwrap();
1128                    println!("Mock server: Received request #{count}");
1129
1130                    if count <= 2 {
1131                        let response = "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 21\r\n\r\nInternal Server Error";
1132                        let _ = stream
1133                            .write_all(response.as_bytes())
1134                            .await;
1135                    } else {
1136                        let response = format!(
1137                            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1138                            json_response_clone.len(),
1139                            json_response_clone
1140                        );
1141                        let _ = stream
1142                            .write_all(response.as_bytes())
1143                            .await;
1144                    }
1145                    let _ = stream.flush().await;
1146                    let _ = stream.shutdown().await;
1147                });
1148            }
1149        });
1150        (addr, request_count)
1151    }
1152
1153    #[tokio::test]
1154    async fn test_bebop_quote_retry_on_bad_response() {
1155        let (addr, request_count) = create_retry_server().await;
1156
1157        let client = create_test_bebop_client(
1158            format!("http://127.0.0.1:{}/quote", addr.port()),
1159            Duration::from_secs(5),
1160        );
1161        let params = create_test_quote_params();
1162        let result = client
1163            .request_binding_quote(&params)
1164            .await;
1165
1166        assert!(result.is_ok(), "Expected success after retries, got: {:?}", result);
1167        let quote = result.unwrap();
1168
1169        // Verify the quote (amounts from aggregate_order.json)
1170        assert_eq!(quote.amount_in, BigUint::from_str("20000000000").unwrap());
1171        assert_eq!(quote.amount_out, BigUint::from_str("21700473797683400419007").unwrap());
1172
1173        // Verify exactly 3 requests were made (2 failures + 1 success)
1174        let final_count = *request_count.lock().unwrap();
1175        assert_eq!(final_count, 3, "Expected 3 requests, got {}", final_count);
1176    }
1177
1178    #[test]
1179    fn test_bebop_client_serialize_deserialize_roundtrip() {
1180        let token_in = Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2").unwrap();
1181        let token_out = Bytes::from_str("0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599").unwrap();
1182        let quote_token = Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap();
1183
1184        let original = BebopClient {
1185            chain: Chain::Ethereum,
1186            price_ws: "wss://api.bebop.xyz/pricing".to_string(),
1187            quote_endpoint: "https://api.bebop.xyz/quote".to_string(),
1188            tokens: HashSet::from([token_in.clone(), token_out.clone()]),
1189            tvl: 50.5,
1190            ws_user: "secret_user".to_string(),
1191            ws_key: "secret_key".to_string(),
1192            quote_tokens: HashSet::from([quote_token.clone()]),
1193            quote_timeout: Duration::from_millis(5500),
1194        };
1195
1196        let serialized = serde_json::to_string(&original).unwrap();
1197        let deserialized: BebopClient = serde_json::from_str(&serialized).unwrap();
1198
1199        // Fields that should round-trip correctly
1200        assert_eq!(deserialized.chain, original.chain);
1201        assert_eq!(deserialized.price_ws, original.price_ws);
1202        assert_eq!(deserialized.quote_endpoint, original.quote_endpoint);
1203        assert_eq!(deserialized.tokens, original.tokens);
1204        assert_eq!(deserialized.tvl, original.tvl);
1205        assert_eq!(deserialized.quote_tokens, original.quote_tokens);
1206        assert_eq!(deserialized.quote_timeout, original.quote_timeout);
1207
1208        // ws_user and ws_key should NOT round-trip (skip_serializing + default)
1209        assert_eq!(deserialized.ws_user, "");
1210        assert_eq!(deserialized.ws_key, "");
1211        assert_ne!(deserialized.ws_user, original.ws_user);
1212        assert_ne!(deserialized.ws_key, original.ws_key);
1213    }
1214
1215    #[test]
1216    fn test_bebop_client_deserialize_with_credentials() {
1217        // When ws_user and ws_key are provided in JSON, they should be deserialized
1218        // (skip_serializing only affects serialization, not deserialization)
1219        let json = r#"{
1220            "chain": "ethereum",
1221            "price_ws": "wss://api.bebop.xyz/pricing",
1222            "quote_endpoint": "https://api.bebop.xyz/quote",
1223            "tokens": [],
1224            "tvl": 10.0,
1225            "ws_user": "provided_user",
1226            "ws_key": "provided_key",
1227            "quote_tokens": [],
1228            "quote_timeout": {"secs": 30, "nanos": 0}
1229        }"#;
1230
1231        let client: BebopClient = serde_json::from_str(json).unwrap();
1232
1233        // Credentials should be deserialized from JSON
1234        assert_eq!(client.ws_user, "provided_user");
1235        assert_eq!(client.ws_key, "provided_key");
1236    }
1237}