1use derive_more::Display;
2use futures::{stream::BoxStream, Stream, StreamExt};
3pub use indicator::{window::mode::tumbling::period::PeriodKind, Period};
4use indicator::{Tick, TickValue, Tickable};
5use positions::prelude::Str;
6use rust_decimal::Decimal;
7use serde::{Deserialize, Serialize};
8use std::{
9 fmt,
10 ops::{Bound, RangeBounds},
11};
12use time::OffsetDateTime;
13
14use exc_service::{ExchangeError, Request};
15
16pub struct CandleStream {
18 forward: bool,
19 stream: BoxStream<'static, Result<Candle, ExchangeError>>,
20}
21
22impl CandleStream {
23 pub fn new_forward(
25 stream: impl Stream<Item = Result<Candle, ExchangeError>> + Send + 'static,
26 ) -> Self {
27 Self {
28 forward: true,
29 stream: stream.boxed(),
30 }
31 }
32
33 pub fn new_backward(
35 stream: impl Stream<Item = Result<Candle, ExchangeError>> + Send + 'static,
36 ) -> Self {
37 Self {
38 forward: false,
39 stream: stream.boxed(),
40 }
41 }
42
43 pub fn is_forward(&self) -> bool {
45 self.forward
46 }
47}
48
49impl Stream for CandleStream {
50 type Item = Result<Candle, ExchangeError>;
51
52 fn poll_next(
53 mut self: std::pin::Pin<&mut Self>,
54 cx: &mut std::task::Context<'_>,
55 ) -> std::task::Poll<Option<Self::Item>> {
56 self.stream.poll_next_unpin(cx)
57 }
58
59 fn size_hint(&self) -> (usize, Option<usize>) {
60 self.stream.size_hint()
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct QueryCandles {
67 pub inst: Str,
69 pub period: Period,
71 pub start: Bound<OffsetDateTime>,
73 pub end: Bound<OffsetDateTime>,
75}
76
77fn fmt_ts_start_bound(bound: &Bound<OffsetDateTime>) -> String {
78 match bound {
79 Bound::Unbounded => "(".to_string(),
80 Bound::Excluded(ts) => format!("({ts}"),
81 Bound::Included(ts) => format!("[{ts}"),
82 }
83}
84
85fn fmt_ts_end_bound(bound: &Bound<OffsetDateTime>) -> String {
86 match bound {
87 Bound::Unbounded => ")".to_string(),
88 Bound::Excluded(ts) => format!("{ts})"),
89 Bound::Included(ts) => format!("{ts}]"),
90 }
91}
92
93impl fmt::Display for QueryCandles {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 write!(
96 f,
97 "{}-{}-{}, {}",
98 self.inst,
99 self.period,
100 fmt_ts_start_bound(&self.start),
101 fmt_ts_end_bound(&self.end)
102 )
103 }
104}
105
106impl QueryCandles {
107 pub fn new<R>(inst: impl AsRef<str>, period: Period, range: R) -> Self
109 where
110 R: RangeBounds<OffsetDateTime>,
111 {
112 let offset = period.utc_offset();
113 let start = match range.start_bound() {
114 Bound::Unbounded => Bound::Unbounded,
115 Bound::Included(&t) => Bound::Included(t.to_offset(offset)),
116 Bound::Excluded(&t) => Bound::Excluded(t.to_offset(offset)),
117 };
118 let end = match range.end_bound() {
119 Bound::Unbounded => Bound::Unbounded,
120 Bound::Included(&t) => Bound::Included(t.to_offset(offset)),
121 Bound::Excluded(&t) => Bound::Excluded(t.to_offset(offset)),
122 };
123 Self {
124 inst: Str::new(inst),
125 period,
126 start,
127 end,
128 }
129 }
130
131 pub fn inst(&self) -> &str {
133 self.inst.as_str()
134 }
135
136 pub fn period(&self) -> Period {
138 self.period
139 }
140
141 pub fn is_empty(&self) -> bool {
143 match (self.start_bound(), self.end_bound()) {
144 (Bound::Unbounded, _) => false,
145 (_, Bound::Unbounded) => false,
146 (Bound::Included(start), Bound::Included(end)) => *start > *end,
147 (Bound::Included(start), Bound::Excluded(end)) => *start >= *end,
148 (Bound::Excluded(start), Bound::Included(end)) => *start >= *end,
149 (Bound::Excluded(start), Bound::Excluded(end)) => *start >= *end,
150 }
151 }
152}
153
154impl RangeBounds<OffsetDateTime> for QueryCandles {
155 fn start_bound(&self) -> Bound<&OffsetDateTime> {
156 match &self.start {
157 Bound::Unbounded => Bound::Unbounded,
158 Bound::Included(t) => Bound::Included(t),
159 Bound::Excluded(t) => Bound::Excluded(t),
160 }
161 }
162
163 fn end_bound(&self) -> Bound<&OffsetDateTime> {
164 match &self.end {
165 Bound::Unbounded => Bound::Unbounded,
166 Bound::Included(t) => Bound::Included(t),
167 Bound::Excluded(t) => Bound::Excluded(t),
168 }
169 }
170}
171
172impl Request for QueryCandles {
173 type Response = CandleStream;
174}
175
176#[derive(Debug, Clone)]
179pub struct QueryLastCandles {
180 pub query: QueryCandles,
182 pub last: usize,
184}
185
186impl fmt::Display for QueryLastCandles {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 write!(f, "{}-(-{})", self.query, self.last)
189 }
190}
191
192impl QueryLastCandles {
193 pub fn new<R>(inst: &str, period: Period, range: R, last: usize) -> Self
195 where
196 R: RangeBounds<OffsetDateTime>,
197 {
198 let query = QueryCandles::new(inst, period, range);
199 Self { query, last }
200 }
201
202 pub fn last(&self) -> usize {
204 self.last
205 }
206
207 pub fn query(&self) -> &QueryCandles {
209 &self.query
210 }
211}
212
213impl Request for QueryLastCandles {
214 type Response = CandleStream;
215}
216
217#[derive(Debug, Clone)]
220pub struct QueryFirstCandles {
221 pub query: QueryCandles,
223 pub first: usize,
225}
226
227impl fmt::Display for QueryFirstCandles {
228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229 write!(f, "{}-({})", self.query, self.first)
230 }
231}
232
233impl QueryFirstCandles {
234 pub fn new<R>(inst: &str, period: Period, range: R, first: usize) -> Self
236 where
237 R: RangeBounds<OffsetDateTime>,
238 {
239 let query = QueryCandles::new(inst, period, range);
240 Self { query, first }
241 }
242
243 pub fn first(&self) -> usize {
245 self.first
246 }
247
248 pub fn query(&self) -> &QueryCandles {
250 &self.query
251 }
252}
253
254impl Request for QueryFirstCandles {
255 type Response = CandleStream;
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize, Display)]
260#[display(fmt = "ts={ts} ohlcv=[{open}, {high}, {low}, {close}, {volume}]")]
261pub struct Candle {
262 #[serde(with = "time::serde::rfc3339")]
264 pub ts: OffsetDateTime,
265 pub open: Decimal,
267 pub high: Decimal,
269 pub low: Decimal,
271 pub close: Decimal,
273 #[serde(default)]
275 pub volume: Decimal,
276}
277
278impl Tickable for Candle {
279 type Value = Self;
280
281 fn tick(&self) -> Tick {
282 Tick::new(self.ts)
283 }
284
285 fn value(&self) -> &Self::Value {
286 self
287 }
288
289 fn into_tick_value(self) -> TickValue<Self::Value> {
290 TickValue::new(self.ts, self)
291 }
292}