1pub use {
2 express_relay_api_types as api_types,
3 solana_sdk,
4};
5use {
6 express_relay_api_types::{
7 bid::{
8 BidCreate,
9 BidCreateOnChainSvm,
10 BidCreateSvm,
11 BidCreateSwapSvm,
12 BidCreateSwapSvmTag,
13 },
14 opportunity::{
15 FeeToken,
16 GetOpportunitiesQueryParams,
17 Opportunity,
18 OpportunityParamsSvm,
19 OpportunityParamsV1ProgramSvm,
20 QuoteTokens,
21 Route,
22 },
23 ws::ServerResultMessage,
24 ErrorBodyResponse,
25 Routable,
26 },
27 futures_util::{
28 SinkExt,
29 Stream,
30 StreamExt,
31 },
32 reqwest::Response,
33 serde::{
34 de::DeserializeOwned,
35 Deserialize,
36 Serialize,
37 },
38 solana_sdk::transaction::Transaction,
39 spl_token::native_mint,
40 std::{
41 collections::HashMap,
42 marker::PhantomData,
43 pin::Pin,
44 sync::Arc,
45 task::{
46 Context,
47 Poll,
48 },
49 time::Duration,
50 },
51 svm::{
52 GetSubmitBidInstructionParams,
53 GetSwapInstructionParams,
54 },
55 tokio::{
56 net::TcpStream,
57 sync::{
58 broadcast,
59 mpsc,
60 oneshot,
61 RwLock,
62 },
63 },
64 tokio_stream::wrappers::{
65 errors::BroadcastStreamRecvError,
66 BroadcastStream,
67 },
68 tokio_tungstenite::{
69 connect_async,
70 tungstenite::{
71 client::IntoClientRequest,
72 Message,
73 },
74 MaybeTlsStream,
75 WebSocketStream,
76 },
77 url::Url,
78};
79
80pub mod svm;
81
82pub struct ClientInner {
83 http_url: Url,
84 ws_url: Url,
85 api_key: Option<String>,
86 client: reqwest::Client,
87}
88
89#[derive(Clone)]
90pub struct Client {
91 inner: Arc<ClientInner>,
92}
93
94#[derive(Debug, Clone)]
95pub struct ClientConfig {
96 pub http_url: String,
97 pub api_key: Option<String>,
98}
99
100#[derive(Debug)]
101pub enum ClientError {
102 InvalidHttpUrl(String),
103 RequestFailed(reqwest::Error),
104 RequestError(String),
105 DecodeResponseFailed(reqwest::Error),
106 WsConnectFailed(String),
107 WsRequestFailed(String),
108 InvalidResponse(String),
109 ChainNotSupported,
110 NewBidError(String),
111 SvmError(String),
112}
113
114enum DecodedResponse<T: DeserializeOwned> {
115 Ok(T),
116 Err(ErrorBodyResponse),
117}
118
119impl<'de, T: DeserializeOwned> serde::Deserialize<'de> for DecodedResponse<T> {
120 fn deserialize<D>(deserializer: D) -> Result<DecodedResponse<T>, D::Error>
121 where
122 D: ::serde::Deserializer<'de>,
123 {
124 let json_value = serde_json::Value::deserialize(deserializer)?;
125 let value: Result<T, serde_json::Error> = serde_json::from_value(json_value.clone());
126 match value {
127 Ok(response) => Ok(DecodedResponse::Ok(response)),
128 Err(error) => serde_json::from_value(json_value)
129 .map(DecodedResponse::Err)
130 .map_err(|_| serde::de::Error::custom(error)),
131 }
132 }
133}
134
135type WsRequest = (
136 api_types::ws::ClientRequest,
137 oneshot::Sender<ServerResultMessage>,
138);
139
140pub struct WsClientInner {
141 #[allow(dead_code)]
142 ws: tokio::task::JoinHandle<()>,
143 request_sender: mpsc::UnboundedSender<WsRequest>,
144 request_id: RwLock<u64>,
145
146 update_receiver: broadcast::Receiver<api_types::ws::ServerUpdateResponse>,
147}
148
149#[derive(Clone)]
150pub struct WsClient {
151 inner: Arc<WsClientInner>,
152}
153
154#[derive(Deserialize)]
155#[serde(untagged)]
156#[allow(clippy::large_enum_variant)]
157enum MessageType {
158 Response(api_types::ws::ServerResultResponse),
159 Update(api_types::ws::ServerUpdateResponse),
160}
161
162pub struct WsClientUpdateStream<'a> {
178 stream: BroadcastStream<api_types::ws::ServerUpdateResponse>,
179 _lifetime: PhantomData<&'a ()>,
180}
181
182impl WsClientUpdateStream<'_> {
183 pub fn new(stream: BroadcastStream<api_types::ws::ServerUpdateResponse>) -> Self {
184 Self {
185 stream,
186 _lifetime: PhantomData,
187 }
188 }
189}
190
191impl Stream for WsClientUpdateStream<'_> {
193 type Item = Result<api_types::ws::ServerUpdateResponse, BroadcastStreamRecvError>;
194
195 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
196 let stream = &mut self.get_mut().stream;
197 stream.poll_next_unpin(cx)
198 }
199}
200
201impl WsClient {
202 pub fn get_update_stream(&self) -> WsClientUpdateStream {
213 WsClientUpdateStream::new(BroadcastStream::new(
214 self.inner.update_receiver.resubscribe(),
215 ))
216 }
217
218 async fn run(
237 mut ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
238 mut request_receiver: mpsc::UnboundedReceiver<WsRequest>,
239 update_sender: broadcast::Sender<api_types::ws::ServerUpdateResponse>,
240 ) {
241 let mut connection_check = tokio::time::interval(Duration::from_secs(1));
242 let ping_duration = Duration::from_secs(32); let mut latest_ping = tokio::time::Instant::now();
244 let mut requests_map = HashMap::<String, oneshot::Sender<ServerResultMessage>>::new();
245 loop {
246 tokio::select! {
247 message = ws_stream.next() => {
248 let message = match message {
249 Some(message) => {
250 match message {
251 Ok(message) => message,
252 Err(_) => continue,
253 }
254 }
255 None => break,
256 };
257
258 let message = match message {
259 Message::Text(text) => {
260 let response: Result<MessageType, serde_json::Error> = serde_json::from_str(&text);
261 match response {
262 Ok(response) => response,
263 Err(_) => continue,
264 }
265 }
266 Message::Binary(binary) => {
267 let response: Result<MessageType, serde_json::Error> = serde_json::from_slice(binary.as_slice());
268 match response {
269 Ok(response) => response,
270 Err(_) => continue,
271 }
272 }
273 Message::Close(_) => break,
274 Message::Pong(_) => continue,
275 Message::Ping(data) => {
276 latest_ping = tokio::time::Instant::now();
277 let _ = ws_stream.send(Message::Pong(data)).await;
278 continue;
279 },
280 Message::Frame(_) => continue,
281 };
282
283 match message {
284 MessageType::Response(response) => {
285 response.id.and_then(|id| requests_map.remove(&id)).map(|sender| sender.send(response.result));
286 }
287 MessageType::Update(update) => {
288 _ = update_sender.send(update);
289 continue;
290 }
291 }
292 }
293 request = request_receiver.recv() => {
294 match request {
295 Some((request, response_sender)) => {
296 if ws_stream.send(Message::Text(serde_json::to_string(&request).unwrap())).await.is_ok() {
297 requests_map.insert(request.id.clone(), response_sender);
298 }
299 }
300 None => break,
301 }
302 }
303 _ = connection_check.tick() => {
304 if latest_ping.elapsed() > ping_duration {
305 break;
306 }
307 },
308 }
309 }
310 }
311
312 async fn fetch_add_request_id(&self) -> u64 {
313 let mut write_guard = self.inner.request_id.write().await;
314 *write_guard += 1;
315 *write_guard
316 }
317
318 async fn send(
331 &self,
332 message: api_types::ws::ClientMessage,
333 ) -> Result<ServerResultMessage, ClientError> {
334 let request_id = self.fetch_add_request_id().await;
335 let request = api_types::ws::ClientRequest {
336 id: request_id.to_string(),
337 msg: message,
338 };
339
340 let (response_sender, response_receiver) = oneshot::channel();
341 if self
342 .inner
343 .request_sender
344 .send((request, response_sender))
345 .is_err()
346 {
347 return Err(ClientError::WsRequestFailed(
348 "Failed to send request".to_string(),
349 ));
350 }
351
352 match tokio::time::timeout(Duration::from_secs(5), response_receiver).await {
353 Ok(response) => match response {
354 Ok(response) => Ok(response),
355 Err(_) => Err(ClientError::WsRequestFailed(
356 "Response channel closed".to_string(),
357 )),
358 },
359 Err(_) => Err(ClientError::WsRequestFailed(
361 "Ws request timeout".to_string(),
362 )),
363 }
364 }
365
366 pub async fn chain_subscribe(&self, chain_ids: Vec<String>) -> Result<(), ClientError> {
380 let message = api_types::ws::ClientMessage::Subscribe {
381 chain_ids: chain_ids
382 .iter()
383 .map(|chain_id| chain_id.to_string())
384 .collect(),
385 };
386 let result = self.send(message).await?;
387 match result {
388 ServerResultMessage::Success(_) => Ok(()),
389 ServerResultMessage::Err(error) => Err(ClientError::WsRequestFailed(error)),
390 }
391 }
392
393 pub async fn chain_unsubscribe(&self, chain_ids: Vec<String>) -> Result<(), ClientError> {
407 let message = api_types::ws::ClientMessage::Unsubscribe {
408 chain_ids: chain_ids
409 .iter()
410 .map(|chain_id| chain_id.to_string())
411 .collect(),
412 };
413 let result = self.send(message).await?;
414 match result {
415 ServerResultMessage::Success(_) => Ok(()),
416 ServerResultMessage::Err(error) => Err(ClientError::WsRequestFailed(error)),
417 }
418 }
419
420 pub async fn submit_bid(
434 &self,
435 bid: api_types::bid::BidCreate,
436 ) -> Result<api_types::bid::BidResult, ClientError> {
437 let message = api_types::ws::ClientMessage::PostBid { bid };
438 let result = self.send(message).await?;
439 match result {
440 ServerResultMessage::Success(response) => {
441 let response = response.ok_or(ClientError::InvalidResponse(
442 "Invalid server response: Expected BidResult but got None.".to_string(),
443 ))?;
444 let api_types::ws::APIResponse::BidResult(response) = response;
445 Ok(response)
446 }
447 ServerResultMessage::Err(error) => Err(ClientError::WsRequestFailed(error)),
448 }
449 }
450
451 pub async fn cancel_bid(
465 &self,
466 bid_cancel: api_types::bid::BidCancel,
467 ) -> Result<(), ClientError> {
468 let message = api_types::ws::ClientMessage::CancelBid { data: bid_cancel };
469 let result = self.send(message).await?;
470 match result {
471 ServerResultMessage::Success(_) => Ok(()),
472 ServerResultMessage::Err(error) => Err(ClientError::WsRequestFailed(error)),
473 }
474 }
475}
476
477impl Client {
478 async fn decode<T: DeserializeOwned>(response: Response) -> Result<T, ClientError> {
479 match response.json().await {
480 Ok(DecodedResponse::Ok(response)) => Ok(response),
481 Ok(DecodedResponse::Err(response)) => Err(ClientError::RequestError(response.error)),
482 Err(e) => Err(ClientError::DecodeResponseFailed(e)),
483 }
484 }
485
486 async fn send<T: Serialize, R: DeserializeOwned>(
512 &self,
513 route: impl Routable,
514 query: Option<T>,
515 ) -> Result<R, ClientError> {
516 let properties = route.properties();
518 let url = self
519 .inner
520 .http_url
521 .join(properties.full_path.as_str())
522 .map_err(|e| ClientError::InvalidHttpUrl(e.to_string()))?;
523 let mut request = self.inner.client.request(properties.method, url);
524 if let Some(api_key) = self.inner.api_key.clone() {
525 request = request.bearer_auth(api_key);
526 }
527 if let Some(query) = query {
528 request = request.query(&query);
529 }
530 let response = request.send().await.map_err(ClientError::RequestFailed)?;
531 Client::decode(response).await
532 }
533
534 fn get_urls(config: ClientConfig) -> Result<(Url, Url), ClientError> {
535 let http_url = Url::parse(config.http_url.as_str())
536 .map_err(|e| ClientError::InvalidHttpUrl(e.to_string()))?;
537
538 if http_url.scheme() != "http" && http_url.scheme() != "https" {
539 return Err(ClientError::InvalidHttpUrl(format!(
540 "Invalid scheme {}",
541 http_url.scheme()
542 )));
543 }
544
545 let ws_url_string = if http_url.scheme() == "http" {
546 config.http_url.replace("http", "ws")
547 } else {
548 config.http_url.replace("https", "wss")
549 };
550 let ws_url = Url::parse(ws_url_string.as_str()).expect("Failed to parse ws url");
551
552 Ok((http_url, ws_url))
553 }
554
555 pub fn try_new(config: ClientConfig) -> Result<Self, ClientError> {
569 let (http_url, ws_url) = Self::get_urls(config.clone())?;
570 Ok(Self {
571 inner: Arc::new(ClientInner {
572 http_url,
573 ws_url,
574 api_key: config.api_key,
575 client: reqwest::Client::new(),
576 }),
577 })
578 }
579
580 pub async fn connect_websocket(&self) -> Result<WsClient, ClientError> {
594 let url = self
595 .inner
596 .ws_url
597 .join(api_types::ws::Route::Ws.properties().full_path.as_str())
598 .map_err(|e| ClientError::WsConnectFailed(e.to_string()))?;
599 let mut request = url
600 .as_str()
601 .into_client_request()
602 .map_err(|e| ClientError::WsConnectFailed(e.to_string()))?;
603 if let Some(api_key) = self.inner.api_key.clone() {
604 let bearer_token = format!("Bearer {}", api_key);
605 request.headers_mut().insert(
606 "Authorization",
607 bearer_token.parse().map_err(|_| {
608 ClientError::WsConnectFailed("Failed to parse api key".to_string())
609 })?,
610 );
611 }
612 let (ws_stream, _) = connect_async(request)
613 .await
614 .map_err(|e| ClientError::WsConnectFailed(e.to_string()))?;
615
616 let (request_sender, request_receiver) = mpsc::unbounded_channel();
617 let (update_sender, update_receiver) = broadcast::channel(1000);
618
619 Ok(WsClient {
620 inner: Arc::new(WsClientInner {
621 request_sender,
622 update_receiver,
623 request_id: RwLock::new(0),
624 ws: tokio::spawn(WsClient::run(ws_stream, request_receiver, update_sender)),
625 }),
626 })
627 }
628
629 pub async fn get_opportunities(
639 &self,
640 params: Option<GetOpportunitiesQueryParams>,
641 ) -> Result<Vec<Opportunity>, ClientError> {
642 self.send(Route::GetOpportunities, params).await
643 }
644
645 pub async fn new_bid(
661 &self,
662 opportunity: api_types::opportunity::OpportunitySvm,
663 params: svm::NewBidParams,
664 ) -> Result<api_types::bid::BidCreate, ClientError> {
665 let OpportunityParamsSvm::V1(opportunity_params) = opportunity.params.clone();
666 match opportunity_params.program {
667 OpportunityParamsV1ProgramSvm::Limo { .. } => {
668 let program_params = match params.program_params {
669 svm::ProgramParams::Limo(params) => Ok(params),
670 _ => Err(ClientError::NewBidError(
671 "Invalid program params for Limo opportunity".to_string(),
672 )),
673 }?;
674 let mut instructions = params.instructions;
675 instructions.push(svm::Svm::get_submit_bid_instruction(
676 GetSubmitBidInstructionParams {
677 chain_id: opportunity_params.chain_id.clone(),
678 amount: params.amount,
679 deadline: params.deadline,
680 searcher: params.searcher,
681 permission: program_params.permission,
682 router: program_params.router,
683 relayer_signer: params.relayer_signer,
684 fee_receiver_relayer: params.fee_receiver_relayer,
685 },
686 )?);
687 let mut transaction =
688 Transaction::new_with_payer(instructions.as_slice(), Some(¶ms.payer));
689 transaction
690 .try_partial_sign(¶ms.signers, params.block_hash)
691 .map_err(|e| {
692 ClientError::NewBidError(format!("Failed to sign transaction: {:?}", e))
693 })?;
694 Ok(BidCreate::Svm(BidCreateSvm::OnChain(BidCreateOnChainSvm {
695 chain_id: opportunity_params.chain_id.clone(),
696 transaction: transaction.into(),
697 slot: params.slot,
698 })))
699 }
700 OpportunityParamsV1ProgramSvm::Swap {
701 user_wallet_address,
702 user_mint_user_balance,
703 tokens,
704 fee_token,
705 router_account,
706 referral_fee_bps,
707 token_account_initialization_configs,
708 memo,
709 ..
710 } => {
711 let _ = match params.program_params {
712 svm::ProgramParams::Swap(params) => Ok(params),
713 _ => Err(ClientError::NewBidError(
714 "Invalid program params for swap opportunity".to_string(),
715 )),
716 }?;
717
718 let (searcher_token, user_token, user_amount_including_fees) = match tokens.tokens {
719 QuoteTokens::SearcherTokenSpecified {
720 searcher_token,
721 user_token,
722 ..
723 } => (
724 searcher_token,
725 user_token,
726 svm::Svm::get_bid_amount_including_fees(
727 &opportunity.params,
728 params.amount,
729 )?,
730 ),
731 QuoteTokens::UserTokenSpecified {
732 searcher_token,
733 user_token,
734 user_amount_including_fees,
735 ..
736 } => (searcher_token, user_token, user_amount_including_fees),
737 };
738 let (fee_token, fee_token_program) = match fee_token {
739 FeeToken::SearcherToken => (searcher_token, tokens.token_program_searcher),
740 FeeToken::UserToken => (user_token, tokens.token_program_user),
741 };
742 let mut instructions = params.instructions;
743 if let Some(memo) = memo {
744 instructions.push(svm::Svm::get_memo_instruction(memo));
745 }
746 instructions.extend(svm::Svm::get_swap_create_accounts_idempotent_instructions(
747 svm::GetSwapCreateAccountsIdempotentInstructionsParams {
748 searcher: params.searcher,
749 user: user_wallet_address,
750 searcher_token,
751 token_program_searcher: tokens.token_program_searcher,
752 mint_user: user_token,
753 token_program_user: tokens.token_program_user,
754 fee_token,
755 fee_token_program,
756 router_account,
757 fee_receiver_relayer: params.fee_receiver_relayer,
758 referral_fee_bps,
759 chain_id: opportunity_params.chain_id.clone(),
760 configs: token_account_initialization_configs.clone(),
761 },
762 ));
763 if user_token == native_mint::id() {
764 let user_amount_to_wrap = svm::Svm::get_user_amount_to_wrap(
765 user_amount_including_fees,
766 user_mint_user_balance,
767 &token_account_initialization_configs,
768 );
769 instructions.extend(svm::Svm::get_wrap_sol_instructions(
770 svm::GetWrapSolInstructionsParams {
771 payer: params.payer,
772 owner: user_wallet_address,
773 amount: user_amount_to_wrap,
774 create_ata: false,
775 },
776 )?);
777 }
778 instructions.push(svm::Svm::get_swap_instruction(GetSwapInstructionParams {
779 opportunity_params: opportunity.params,
780 bid_amount: params.amount,
781 deadline: params.deadline,
782 searcher: params.searcher,
783 fee_receiver_relayer: params.fee_receiver_relayer,
784 relayer_signer: params.relayer_signer,
785 })?);
786 if searcher_token == native_mint::id() || user_token == native_mint::id() {
787 instructions.push(svm::Svm::get_unwrap_sol_instruction(
788 svm::GetUnwrapSolInstructionParams {
789 owner: user_wallet_address,
790 },
791 )?)
792 }
793 let mut transaction =
794 Transaction::new_with_payer(instructions.as_slice(), Some(¶ms.payer));
795 transaction
796 .try_partial_sign(¶ms.signers, params.block_hash)
797 .map_err(|e| {
798 ClientError::NewBidError(format!("Failed to sign transaction: {:?}", e))
799 })?;
800 Ok(BidCreate::Svm(BidCreateSvm::Swap(BidCreateSwapSvm {
801 chain_id: opportunity_params.chain_id,
802 transaction: transaction.into(),
803 opportunity_id: opportunity.opportunity_id,
804 _type: BidCreateSwapSvmTag::Swap,
805 })))
806 }
807 }
808 }
809}