xapi_binance/common/ws/
api.rs1use crate::common::{payload::BnWsApiPayload, response::BnWsApiResponse};
2use ezsockets::{Bytes, Client, ClientConfig, ClientExt, Error, Utf8Bytes};
3use std::collections::HashMap;
4use tokio::sync::oneshot;
5use xapi_shared::ws::{api::SharedWsApiTrait, error::SharedWsError};
6
7pub struct BnWsApi {
8 client: Client<Self>,
9 on_connect_tx: Option<oneshot::Sender<()>>,
10 resp_map: HashMap<String, oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>>,
11}
12
13pub enum BnWsApiCall {
14 SendApi {
15 payload: BnWsApiPayload,
16 tx: oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>,
17 },
18}
19
20#[async_trait::async_trait]
21impl ClientExt for BnWsApi {
22 type Call = BnWsApiCall;
23
24 async fn on_text(&mut self, text: Utf8Bytes) -> Result<(), Error> {
25 let msg = text.to_string();
26
27 if let Some(result) = self.recv_oneshot_resp(&text) {
28 return result.map_err(|err| err.into());
29 }
30
31 tracing::error!(?msg, "unhandled bn ws api response data");
32 Err(SharedWsError::AppError("unhandled bn ws api response data".to_string()).into())
33 }
34
35 async fn on_binary(&mut self, _bytes: Bytes) -> Result<(), Error> {
36 unimplemented!()
37 }
38
39 async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> {
40 match call {
41 BnWsApiCall::SendApi { payload, tx } => self.send_oneshot(payload, tx)?,
42 }
43
44 Ok(())
45 }
46
47 async fn on_connect(&mut self) -> Result<(), Error> {
48 if let Some(tx) = self.on_connect_tx.take() {
49 tx.send(())
50 .inspect_err(|_| {
51 tracing::error!("failed to send on_connect signal");
52 })
53 .map_err(|_| {
54 SharedWsError::ChannelClosedError("first on connect channel closed".to_string())
55 })?;
56 }
57 Ok(())
58 }
59}
60
61impl SharedWsApiTrait<String, BnWsApiPayload, BnWsApiResponse> for BnWsApi {
62 fn get_client(&self) -> &Client<Self> {
63 &self.client
64 }
65
66 fn get_oneshot_tx_map(
67 &mut self,
68 ) -> &mut HashMap<String, oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>> {
69 &mut self.resp_map
70 }
71}
72
73impl BnWsApi {
74 pub async fn connect(config: ClientConfig) -> Client<Self> {
75 let (on_connect_tx, on_connect_rx) = oneshot::channel();
76
77 let (client, future) = ezsockets::connect(
78 |client| Self {
79 client,
80 on_connect_tx: Some(on_connect_tx),
81 resp_map: Default::default(),
82 },
83 config,
84 )
85 .await;
86
87 tokio::spawn(async move {
88 future.await.inspect_err(|err| {
89 tracing::error!(?err, "bn ws client connection error");
90 })
91 });
92
93 _ = on_connect_rx.await;
94
95 client
96 }
97}