1use crate::utils::{DateUtils, Envs};
2
3use async_trait::async_trait;
4use core::convert::From;
5use polars::export::chrono::NaiveDate;
6use polars::frame::row::Row;
7use polars::frame::DataFrame;
8use polars::io::{SerReader, SerWriter};
9use polars::prelude::{CsvReader, CsvWriter, JsonFormat, JsonReader, Schema};
10use reqwest::Request;
11use serde::{Deserialize, Serialize};
12use std::fs::File;
13use std::io::Cursor;
14use std::path::Path;
15use utils::IoUtils;
16
17pub mod cffex;
18pub mod const_vars;
19pub mod sina;
20pub mod utils;
21
22#[derive(Debug, Clone, Default)]
29pub struct DataResult<T> {
30 pub data_id: Option<String>,
31 pub data: Option<T>,
32}
33
34#[derive(Debug, Deserialize, Serialize)]
38pub struct StockData {
39 pub symbol: String,
40 pub date: String,
41 pub low: f64,
42 pub close: f64,
43 pub open: f64,
44 pub high: f64,
45 pub volume: f64,
46}
47
48pub trait ResultCached<T> {
52 fn is_cached(&self) -> bool;
56
57 fn cache(&self);
61
62 fn load(&self, schema: Option<Schema>) -> Result<DataResult<T>, anyhow::Error>;
66}
67
68#[async_trait]
72pub trait RealTimeData {
73 async fn real_time_data(&self) -> Result<DataResult<DataFrame>, anyhow::Error>;
77
78 fn load_cached_schema(&self) -> Option<Schema>;
82}
83
84pub trait DataResultFormat {
88 fn to_dataframe(&self, source: Option<String>) -> anyhow::Result<DataResult<DataFrame>>;
92
93 fn col_alias(&self) -> Option<Vec<(&str, &str)>>;
97
98 fn format(&self, data_result_format: Option<DataFrame>) -> DataResult<DataFrame>;
102}
103
104pub trait HttpSource {
108 fn request(&self) -> Request;
112
113 fn id(&self) -> String;
117}
118
119#[async_trait]
123pub trait HistoryData {
124 async fn history_daily(
128 self,
129 market: &str,
130 symbol: &str,
131 start: NaiveDate,
132 end: NaiveDate,
133 ) -> Result<DataResult<DataFrame>, anyhow::Error>;
134}
135
136impl ResultCached<DataFrame> for DataResult<DataFrame> {
137 fn is_cached(&self) -> bool {
138 match &self.data_id {
139 None => false,
140 Some(id) => Path::new(&DataResult::cache_file_name(id)).exists(),
141 }
142 }
143
144 fn cache(&self) {
145 match &self.data_id {
146 None => {
147 tracing::warn!("data_id 为空,缓存文件失败");
148 }
149 Some(id) => {
150 let cache_file = DataResult::cache_file_name(id);
151 let file_result = File::create(&cache_file);
152 match file_result {
153 Ok(mut file) => {
154 CsvWriter::new(&mut file)
155 .has_header(true)
156 .finish(&mut self.data.clone().unwrap())
157 .expect("缓存数据存储失败");
158 }
159 Err(e) => {
160 tracing::warn!("{}缓存文件创建失败{}", &cache_file, e);
161 }
162 }
163 }
164 }
165 }
166
167 fn load(&self, schema_opt: Option<Schema>) -> Result<DataResult<DataFrame>, anyhow::Error> {
168 match &self.data_id {
169 None => {
170 tracing::warn!("data_id 为空,加载缓存文件失败");
171 Ok(DataResult::empty())
172 }
173 Some(id) => {
174 let cache_file = DataResult::cache_file_name(id);
175 tracing::debug!("load file path:{:?}", &cache_file);
176
177 let data_frame_result = CsvReader::from_path(&cache_file);
178
179 match data_frame_result {
180 Ok(csv_file) => {
181 if let Some(schema) = schema_opt {
182 let data_frame = csv_file
183 .has_header(true)
184 .with_schema(&schema)
185 .with_parse_dates(true)
186 .finish()
187 .expect("TODO: panic message");
188
189 return Ok(DataResult {
190 data_id: Some(id.clone()),
191 data: Some(data_frame.clone()),
192 });
193 } else {
194 tracing::warn!(
195 "load的schema为None,缓存数据可能加载错误或为空,请提供!!!"
196 )
197 }
198
199 Ok(DataResult::empty())
200 }
201 Err(e) => {
202 tracing::warn!("加载缓存文件{}, 解析失败:{}", &cache_file, e);
203 Ok(DataResult::empty())
204 }
205 }
206 }
207 }
208 }
209}
210
211impl DataResult<DataFrame> {
212 pub fn new(data_id: String, data_frame: DataFrame) -> DataResult<DataFrame> {
213 DataResult {
214 data_id: Some(data_id),
215 data: Some(data_frame),
216 }
217 }
218 fn empty() -> DataResult<DataFrame> {
219 DataResult::<DataFrame> {
220 data_id: None,
221 data: Some(DataFrame::empty()),
222 }
223 }
224
225 fn cache_file_name(data_id: &String) -> String {
226 let cache_temp_home = Envs::cache_temp_home();
227 let path = Path::new(&cache_temp_home);
228 if !&path.try_exists().ok().unwrap() && IoUtils::create_dir_recursive(path).is_err() {
229 tracing::warn!("{} 缓存目录创建失败", &cache_temp_home);
230 }
231
232 format_args!(
233 "{}/{}-{}{}",
234 cache_temp_home,
235 &data_id,
236 utils::DateUtils::now_fmt_ymd(),
237 ".csv"
238 )
239 .to_string()
240 }
241}
242
243impl From<Row<'_>> for StockData {
244 fn from(value: Row) -> Self {
245 let data = value.0;
246 StockData {
247 symbol: data[0].get_str().unwrap().to_string(),
248 date: DateUtils::now_fmt_ymd(),
249 low: data[6].to_string().parse().unwrap(),
250 close: data[2].to_string().parse().unwrap(),
251 open: data[7].to_string().parse().unwrap(),
252 high: data[5].to_string().parse().unwrap(),
253 volume: data[3].to_string().parse().unwrap(),
254 }
255 }
256}
257
258impl From<String> for DataResult<DataFrame> {
259 fn from(value: String) -> Self {
260 tracing::debug!("to DataResult from:{}", value);
261
262 let file = Cursor::new(value);
263 let df = JsonReader::new(file)
264 .with_json_format(JsonFormat::Json)
265 .infer_schema_len(Some(1024))
266 .with_batch_size(10)
267 .finish()
268 .unwrap();
269
270 DataResult::<DataFrame>::new("".to_string(), df)
271 }
272}