1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use exc_core::types::candle::{Candle, QueryLastCandles};
use exc_core::Adaptor;
use exc_core::ExchangeError;
use std::ops::RangeBounds;

use crate::http::types::request::history_candles::HistoryCandles;
use crate::http::types::request::Get;
use crate::http::types::response::ResponseData;
use crate::util::timestamp::millis_to_ts;
use crate::util::{
    period::period_to_bar,
    timestamp::{end_bound_to_millis, start_bound_to_millis},
};
use async_stream::stream;
use futures::StreamExt;

use super::HttpRequest;

impl Adaptor<QueryLastCandles> for HttpRequest {
    fn from_request(req: QueryLastCandles) -> Result<Self, exc_core::ExchangeError>
    where
        Self: Sized,
    {
        let limit = req.last();
        let query = req.query();
        // from before to after.
        let start = start_bound_to_millis(query.start_bound());
        let end = end_bound_to_millis(query.end_bound());
        let req = Self::Get(Get::HistoryCandles(HistoryCandles {
            inst_id: query.inst().to_string(),
            after: end,
            before: start,
            bar: period_to_bar(&query.period()),
            limit: Some(limit),
        }));
        Ok(req)
    }

    fn into_response(
        resp: Self::Response,
    ) -> Result<<QueryLastCandles as exc_core::Request>::Response, exc_core::ExchangeError> {
        let stream = stream! {
                for data in resp.data {
        trace!("received a data: {data:?}");
            if let ResponseData::Candle(c) = data {
                if let Some(ts) = millis_to_ts(c.0) {
                yield Ok(Candle {
                    ts,
                    open: c.1,
                    high: c.2,
                    low: c.3,
                    close: c.4,
                    volume: c.5,
                });
                } else {
                yield Err(ExchangeError::Other(anyhow::anyhow!("cannot parse ts")));
                }
            }
            }
                };
        Ok(stream.boxed())
    }
}