use crate::common::{payload::BnWsApiPayload, response::BnWsApiResponse};
use ezsockets::{Bytes, Client, ClientConfig, ClientExt, Error, Utf8Bytes};
use std::collections::HashMap;
use tokio::sync::oneshot;
use xapi_shared::ws::{api::SharedWsApiTrait, error::SharedWsError};
pub struct BnWsApi {
client: Client<Self>,
on_connect_tx: Option<oneshot::Sender<()>>,
resp_map: HashMap<String, oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>>,
}
pub enum BnWsApiCall {
SendApi {
payload: BnWsApiPayload,
tx: oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>,
},
}
#[async_trait::async_trait]
impl ClientExt for BnWsApi {
type Call = BnWsApiCall;
async fn on_text(&mut self, text: Utf8Bytes) -> Result<(), Error> {
let msg = text.to_string();
if let Some(result) = self.recv_oneshot_resp(&text) {
return result.map_err(|err| err.into());
}
tracing::error!(?msg, "unhandled bn ws api response data");
Err(SharedWsError::AppError("unhandled bn ws api response data".to_string()).into())
}
async fn on_binary(&mut self, _bytes: Bytes) -> Result<(), Error> {
unimplemented!()
}
async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> {
match call {
BnWsApiCall::SendApi { payload, tx } => self.send_oneshot(payload, tx)?,
}
Ok(())
}
async fn on_connect(&mut self) -> Result<(), Error> {
if let Some(tx) = self.on_connect_tx.take() {
tx.send(())
.inspect_err(|_| {
tracing::error!("failed to send on_connect signal");
})
.map_err(|_| {
SharedWsError::ChannelClosedError("first on connect channel closed".to_string())
})?;
}
Ok(())
}
}
impl SharedWsApiTrait<String, BnWsApiPayload, BnWsApiResponse> for BnWsApi {
fn get_client(&self) -> &Client<Self> {
&self.client
}
fn get_oneshot_tx_map(
&mut self,
) -> &mut HashMap<String, oneshot::Sender<Result<BnWsApiResponse, SharedWsError>>> {
&mut self.resp_map
}
}
impl BnWsApi {
pub async fn connect(config: ClientConfig) -> Client<Self> {
let (on_connect_tx, on_connect_rx) = oneshot::channel();
let (client, future) = ezsockets::connect(
|client| Self {
client,
on_connect_tx: Some(on_connect_tx),
resp_map: Default::default(),
},
config,
)
.await;
tokio::spawn(async move {
future.await.inspect_err(|err| {
tracing::error!(?err, "bn ws client connection error");
})
});
_ = on_connect_rx.await;
client
}
}