exc_types/
candle.rs

1use derive_more::Display;
2use futures::{stream::BoxStream, Stream, StreamExt};
3pub use indicator::{window::mode::tumbling::period::PeriodKind, Period};
4use indicator::{Tick, TickValue, Tickable};
5use positions::prelude::Str;
6use rust_decimal::Decimal;
7use serde::{Deserialize, Serialize};
8use std::{
9    fmt,
10    ops::{Bound, RangeBounds},
11};
12use time::OffsetDateTime;
13
14use exc_service::{ExchangeError, Request};
15
16/// Candle Stream.
17pub struct CandleStream {
18    forward: bool,
19    stream: BoxStream<'static, Result<Candle, ExchangeError>>,
20}
21
22impl CandleStream {
23    /// Create a new candle stream that produce candles forward.
24    pub fn new_forward(
25        stream: impl Stream<Item = Result<Candle, ExchangeError>> + Send + 'static,
26    ) -> Self {
27        Self {
28            forward: true,
29            stream: stream.boxed(),
30        }
31    }
32
33    /// Create a new candle stream that produce candles backward.
34    pub fn new_backward(
35        stream: impl Stream<Item = Result<Candle, ExchangeError>> + Send + 'static,
36    ) -> Self {
37        Self {
38            forward: false,
39            stream: stream.boxed(),
40        }
41    }
42
43    /// Is forward.
44    pub fn is_forward(&self) -> bool {
45        self.forward
46    }
47}
48
49impl Stream for CandleStream {
50    type Item = Result<Candle, ExchangeError>;
51
52    fn poll_next(
53        mut self: std::pin::Pin<&mut Self>,
54        cx: &mut std::task::Context<'_>,
55    ) -> std::task::Poll<Option<Self::Item>> {
56        self.stream.poll_next_unpin(cx)
57    }
58
59    fn size_hint(&self) -> (usize, Option<usize>) {
60        self.stream.size_hint()
61    }
62}
63
64/// Query candles.
65#[derive(Debug, Clone)]
66pub struct QueryCandles {
67    /// Instrument.
68    pub inst: Str,
69    /// Period.
70    pub period: Period,
71    /// Start.
72    pub start: Bound<OffsetDateTime>,
73    /// End.
74    pub end: Bound<OffsetDateTime>,
75}
76
77fn fmt_ts_start_bound(bound: &Bound<OffsetDateTime>) -> String {
78    match bound {
79        Bound::Unbounded => "(".to_string(),
80        Bound::Excluded(ts) => format!("({ts}"),
81        Bound::Included(ts) => format!("[{ts}"),
82    }
83}
84
85fn fmt_ts_end_bound(bound: &Bound<OffsetDateTime>) -> String {
86    match bound {
87        Bound::Unbounded => ")".to_string(),
88        Bound::Excluded(ts) => format!("{ts})"),
89        Bound::Included(ts) => format!("{ts}]"),
90    }
91}
92
93impl fmt::Display for QueryCandles {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        write!(
96            f,
97            "{}-{}-{}, {}",
98            self.inst,
99            self.period,
100            fmt_ts_start_bound(&self.start),
101            fmt_ts_end_bound(&self.end)
102        )
103    }
104}
105
106impl QueryCandles {
107    /// Create a new query.
108    pub fn new<R>(inst: impl AsRef<str>, period: Period, range: R) -> Self
109    where
110        R: RangeBounds<OffsetDateTime>,
111    {
112        let offset = period.utc_offset();
113        let start = match range.start_bound() {
114            Bound::Unbounded => Bound::Unbounded,
115            Bound::Included(&t) => Bound::Included(t.to_offset(offset)),
116            Bound::Excluded(&t) => Bound::Excluded(t.to_offset(offset)),
117        };
118        let end = match range.end_bound() {
119            Bound::Unbounded => Bound::Unbounded,
120            Bound::Included(&t) => Bound::Included(t.to_offset(offset)),
121            Bound::Excluded(&t) => Bound::Excluded(t.to_offset(offset)),
122        };
123        Self {
124            inst: Str::new(inst),
125            period,
126            start,
127            end,
128        }
129    }
130
131    /// Get Instrument.
132    pub fn inst(&self) -> &str {
133        self.inst.as_str()
134    }
135
136    /// Get period.
137    pub fn period(&self) -> Period {
138        self.period
139    }
140
141    /// Is empty.
142    pub fn is_empty(&self) -> bool {
143        match (self.start_bound(), self.end_bound()) {
144            (Bound::Unbounded, _) => false,
145            (_, Bound::Unbounded) => false,
146            (Bound::Included(start), Bound::Included(end)) => *start > *end,
147            (Bound::Included(start), Bound::Excluded(end)) => *start >= *end,
148            (Bound::Excluded(start), Bound::Included(end)) => *start >= *end,
149            (Bound::Excluded(start), Bound::Excluded(end)) => *start >= *end,
150        }
151    }
152}
153
154impl RangeBounds<OffsetDateTime> for QueryCandles {
155    fn start_bound(&self) -> Bound<&OffsetDateTime> {
156        match &self.start {
157            Bound::Unbounded => Bound::Unbounded,
158            Bound::Included(t) => Bound::Included(t),
159            Bound::Excluded(t) => Bound::Excluded(t),
160        }
161    }
162
163    fn end_bound(&self) -> Bound<&OffsetDateTime> {
164        match &self.end {
165            Bound::Unbounded => Bound::Unbounded,
166            Bound::Included(t) => Bound::Included(t),
167            Bound::Excluded(t) => Bound::Excluded(t),
168        }
169    }
170}
171
172impl Request for QueryCandles {
173    type Response = CandleStream;
174}
175
176/// Query last `n` candles in range.
177/// Return a candle stream that produce the last `last` candles backward.
178#[derive(Debug, Clone)]
179pub struct QueryLastCandles {
180    /// Query.
181    pub query: QueryCandles,
182    /// Last.
183    pub last: usize,
184}
185
186impl fmt::Display for QueryLastCandles {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        write!(f, "{}-(-{})", self.query, self.last)
189    }
190}
191
192impl QueryLastCandles {
193    /// Create a new query.
194    pub fn new<R>(inst: &str, period: Period, range: R, last: usize) -> Self
195    where
196        R: RangeBounds<OffsetDateTime>,
197    {
198        let query = QueryCandles::new(inst, period, range);
199        Self { query, last }
200    }
201
202    /// Get last.
203    pub fn last(&self) -> usize {
204        self.last
205    }
206
207    /// Get query.
208    pub fn query(&self) -> &QueryCandles {
209        &self.query
210    }
211}
212
213impl Request for QueryLastCandles {
214    type Response = CandleStream;
215}
216
217/// Query first `n` candles in range.
218/// Return a candle stream that produce the first `fisrt` candles forward.
219#[derive(Debug, Clone)]
220pub struct QueryFirstCandles {
221    /// Query.
222    pub query: QueryCandles,
223    /// First.
224    pub first: usize,
225}
226
227impl fmt::Display for QueryFirstCandles {
228    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229        write!(f, "{}-({})", self.query, self.first)
230    }
231}
232
233impl QueryFirstCandles {
234    /// Create a new query.
235    pub fn new<R>(inst: &str, period: Period, range: R, first: usize) -> Self
236    where
237        R: RangeBounds<OffsetDateTime>,
238    {
239        let query = QueryCandles::new(inst, period, range);
240        Self { query, first }
241    }
242
243    /// Get first.
244    pub fn first(&self) -> usize {
245        self.first
246    }
247
248    /// Get query.
249    pub fn query(&self) -> &QueryCandles {
250        &self.query
251    }
252}
253
254impl Request for QueryFirstCandles {
255    type Response = CandleStream;
256}
257
258/// Candle (OHLCV).
259#[derive(Debug, Clone, Serialize, Deserialize, Display)]
260#[display(fmt = "ts={ts} ohlcv=[{open}, {high}, {low}, {close}, {volume}]")]
261pub struct Candle {
262    /// Timestamp.
263    #[serde(with = "time::serde::rfc3339")]
264    pub ts: OffsetDateTime,
265    /// The open price.
266    pub open: Decimal,
267    /// The highest price.
268    pub high: Decimal,
269    /// The lowest price.
270    pub low: Decimal,
271    /// The last price.
272    pub close: Decimal,
273    /// The volume.
274    #[serde(default)]
275    pub volume: Decimal,
276}
277
278impl Tickable for Candle {
279    type Value = Self;
280
281    fn tick(&self) -> Tick {
282        Tick::new(self.ts)
283    }
284
285    fn value(&self) -> &Self::Value {
286        self
287    }
288
289    fn into_tick_value(self) -> TickValue<Self::Value> {
290        TickValue::new(self.ts, self)
291    }
292}