exc_okx/http/types/adaptations/
candle.rs1use exc_core::types::candle::{Candle, QueryLastCandles};
2use exc_core::types::CandleStream;
3use exc_core::Adaptor;
4use exc_core::ExchangeError;
5use std::ops::RangeBounds;
6
7use crate::http::types::request::history_candles::HistoryCandles;
8use crate::http::types::request::Get;
9use crate::http::types::response::ResponseData;
10use crate::utils::timestamp::millis_to_ts;
11use crate::utils::{
12 period::period_to_bar,
13 timestamp::{end_bound_to_millis, start_bound_to_millis},
14};
15use async_stream::stream;
16
17use super::HttpRequest;
18
19impl Adaptor<QueryLastCandles> for HttpRequest {
20 fn from_request(req: QueryLastCandles) -> Result<Self, exc_core::ExchangeError>
21 where
22 Self: Sized,
23 {
24 let limit = req.last();
25 let query = req.query();
26 let start = start_bound_to_millis(query.start_bound());
28 let end = end_bound_to_millis(query.end_bound());
29 let req = Self::Get(Get::HistoryCandles(HistoryCandles {
30 inst_id: query.inst().to_string(),
31 after: end,
32 before: start,
33 bar: period_to_bar(&query.period()),
34 limit: Some(limit),
35 }));
36 Ok(req)
37 }
38
39 fn into_response(
40 resp: Self::Response,
41 ) -> Result<<QueryLastCandles as exc_core::Request>::Response, exc_core::ExchangeError> {
42 let stream = stream! {
43 for data in resp.data {
44 trace!("received a data: {data:?}");
45 if let ResponseData::Candle(c) = data {
46 if let Some(ts) = millis_to_ts(c.0) {
47 yield Ok(Candle {
48 ts,
49 open: c.1,
50 high: c.2,
51 low: c.3,
52 close: c.4,
53 volume: c.5,
54 });
55 } else {
56 yield Err(ExchangeError::Other(anyhow::anyhow!("cannot parse ts")));
57 }
58 }
59 }
60 };
61 Ok(CandleStream::new_backward(stream))
62 }
63}