xapi_binance/common/ws/
api.rs

1use crate::common::{payload::BnWsApiPayload, response::BnWsApiResponse};
2use ezsockets::{Bytes, Client, ClientConfig, ClientExt, Error, Utf8Bytes};
3use std::collections::HashMap;
4use tokio::sync::oneshot;
5use xapi_shared::ws::{api::SharedWsApiTrait, error::SharedWsError};
6
7pub struct BnWsApi {
8    client: Client<Self>,
9    on_connect_tx: Option<oneshot::Sender<()>>,
10    resp_map: HashMap<String, oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>>,
11}
12
13pub enum BnWsApiCall {
14    SendApi {
15        payload: BnWsApiPayload,
16        tx: oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>,
17    },
18}
19
20#[async_trait::async_trait]
21impl ClientExt for BnWsApi {
22    type Call = BnWsApiCall;
23
24    async fn on_text(&mut self, text: Utf8Bytes) -> Result<(), Error> {
25        let msg = text.to_string();
26
27        if let Some(result) = self.recv_oneshot_resp(&text) {
28            return result.map_err(|err| err.into());
29        }
30
31        tracing::error!(?msg, "unhandled bn ws api response data");
32        Err(SharedWsError::AppError("unhandled bn ws api response data".to_string()).into())
33    }
34
35    async fn on_binary(&mut self, _bytes: Bytes) -> Result<(), Error> {
36        unimplemented!()
37    }
38
39    async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> {
40        match call {
41            BnWsApiCall::SendApi { payload, tx } => self.send_oneshot(payload, tx)?,
42        }
43
44        Ok(())
45    }
46
47    async fn on_connect(&mut self) -> Result<(), Error> {
48        if let Some(tx) = self.on_connect_tx.take() {
49            tx.send(())
50                .inspect_err(|_| {
51                    tracing::error!("failed to send on_connect signal");
52                })
53                .map_err(|_| {
54                    SharedWsError::ChannelClosedError("first on connect channel closed".to_string())
55                })?;
56        }
57        Ok(())
58    }
59}
60
61impl SharedWsApiTrait<String, BnWsApiPayload, BnWsApiResponse> for BnWsApi {
62    fn get_client(&self) -> &Client<Self> {
63        &self.client
64    }
65
66    fn get_oneshot_tx_map(
67        &mut self,
68    ) -> &mut HashMap<String, oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>> {
69        &mut self.resp_map
70    }
71}
72
73impl BnWsApi {
74    pub async fn connect(config: ClientConfig) -> Client<Self> {
75        let (on_connect_tx, on_connect_rx) = oneshot::channel();
76
77        let (client, future) = ezsockets::connect(
78            |client| Self {
79                client,
80                on_connect_tx: Some(on_connect_tx),
81                resp_map: Default::default(),
82            },
83            config,
84        )
85        .await;
86
87        tokio::spawn(async move {
88            future.await.inspect_err(|err| {
89                tracing::error!(?err, "bn ws client connection error");
90            })
91        });
92
93        _ = on_connect_rx.await;
94
95        client
96    }
97}