Skip to main content

exc_okx/websocket/transport/
channel.rs

1use crate::error::OkxError;
2use crate::websocket::types::{request::Request, response::Response};
3use exc_core::ExchangeError;
4use futures::{future::BoxFuture, FutureExt, TryFutureExt};
5use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
6
7/// Okx websocket channel.
8#[derive(Clone)]
9pub struct Channel {
10    pub(crate) svc: Buffer<BoxService<Request, Response, OkxError>, Request>,
11}
12
13impl Channel {
14    /// Send request.
15    pub async fn request(
16        &mut self,
17        request: Request,
18    ) -> Result<<Self as Service<Request>>::Future, ExchangeError> {
19        ServiceExt::<Request>::ready(self).await?;
20        let fut = self.call(request);
21        Ok(fut)
22    }
23}
24
25impl tower::Service<Request> for Channel {
26    type Response = Response;
27    type Error = ExchangeError;
28    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
29
30    fn poll_ready(
31        &mut self,
32        cx: &mut std::task::Context<'_>,
33    ) -> std::task::Poll<Result<(), Self::Error>> {
34        self.svc
35            .poll_ready(cx)
36            .map_err(OkxError::Buffer)
37            .map_err(|err| ExchangeError::Other(err.into()))
38    }
39
40    fn call(&mut self, req: Request) -> Self::Future {
41        self.svc
42            .call(req)
43            .map_err(OkxError::Buffer)
44            .map_err(|err| ExchangeError::Other(err.into()))
45            .boxed()
46    }
47}