exc_okx/http/types/adaptations/
candle.rs

1use exc_core::types::candle::{Candle, QueryLastCandles};
2use exc_core::types::CandleStream;
3use exc_core::Adaptor;
4use exc_core::ExchangeError;
5use std::ops::RangeBounds;
6
7use crate::http::types::request::history_candles::HistoryCandles;
8use crate::http::types::request::Get;
9use crate::http::types::response::ResponseData;
10use crate::utils::timestamp::millis_to_ts;
11use crate::utils::{
12    period::period_to_bar,
13    timestamp::{end_bound_to_millis, start_bound_to_millis},
14};
15use async_stream::stream;
16
17use super::HttpRequest;
18
19impl Adaptor<QueryLastCandles> for HttpRequest {
20    fn from_request(req: QueryLastCandles) -> Result<Self, exc_core::ExchangeError>
21    where
22        Self: Sized,
23    {
24        let limit = req.last();
25        let query = req.query();
26        // from before to after.
27        let start = start_bound_to_millis(query.start_bound());
28        let end = end_bound_to_millis(query.end_bound());
29        let req = Self::Get(Get::HistoryCandles(HistoryCandles {
30            inst_id: query.inst().to_string(),
31            after: end,
32            before: start,
33            bar: period_to_bar(&query.period()),
34            limit: Some(limit),
35        }));
36        Ok(req)
37    }
38
39    fn into_response(
40        resp: Self::Response,
41    ) -> Result<<QueryLastCandles as exc_core::Request>::Response, exc_core::ExchangeError> {
42        let stream = stream! {
43                for data in resp.data {
44        trace!("received a data: {data:?}");
45            if let ResponseData::Candle(c) = data {
46                if let Some(ts) = millis_to_ts(c.0) {
47                yield Ok(Candle {
48                    ts,
49                    open: c.1,
50                    high: c.2,
51                    low: c.3,
52                    close: c.4,
53                    volume: c.5,
54                });
55                } else {
56                yield Err(ExchangeError::Other(anyhow::anyhow!("cannot parse ts")));
57                }
58            }
59            }
60                };
61        Ok(CandleStream::new_backward(stream))
62    }
63}