1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use chrono::DateTime;
use chrono::prelude::*;
use chrono::Utc;
use log::{error, info, warn};
use crate::api::liquid::LiquidAPI;
use crate::api::liquid::QnGetExecution;
use crate::downloader::Downloader;
use crate::downloader::id::datetime::DateTimeID;
use crate::error::Error;
use crate::error::Result;
use crate::writer::Trade;
use crate::writer::Writer;
#[derive(Debug)]
pub struct LiquidDownloader {
start: DateTime<Utc>,
end: DateTime<Utc>,
api: LiquidAPI,
}
impl LiquidDownloader {
fn limit(&self) -> usize {
1000
}
pub fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
LiquidDownloader {
start,
end,
api: LiquidAPI::new(),
}
}
}
impl Downloader for LiquidDownloader {
type IDT = DateTime<Utc>;
type ID = DateTimeID;
type RAW = QnGetExecution;
fn start_id(&self) -> DateTime<Utc> {
self.start
}
fn end_id(&self) -> DateTime<Utc> {
self.end
}
fn continue_condition(&self, current: &DateTime<Utc>, end: &DateTime<Utc>) -> bool {
current <= end
}
fn fetch(&self, c: &Self::IDT) -> Result<Vec<QnGetExecution>> {
self.api.executions(c.timestamp() as u64, self.limit())
}
fn convert(&self, v: &QnGetExecution) -> Result<Trade> {
let quantity = v.quantity.parse::<f32>()?;
let price = v.price.parse::<f32>()?;
let created_at = Utc.timestamp(v.created_at as i64, 0);
Ok(Trade {
id: format!("{}", v.id),
quantity,
price,
traded_at: created_at,
})
}
fn output(&self, u: Vec<Trade>, writer: &mut impl Writer) -> Result<Self::IDT> {
match u.last() {
Some(last) => {
let last_ts = last.traded_at;
let orig_len = u.len();
let without_last: Vec<Trade> =
u.into_iter().filter(|e| e.traded_at != last_ts).collect();
if orig_len == self.limit() && without_last.is_empty() {
error!(
"more than {} executions at the same timestamp",
self.limit()
);
Err(Error::CannotFetchTradesAccurately)
} else {
writer.write(without_last.as_slice()).map(|num| {
info!("wrote {} data", num);
last_ts
})
}
}
None => {
warn!("no output");
Err(Error::NotFound)
}
}
}
fn sleep_millis(&self) -> u64 {
1100
}
}