qshare/
lib.rs

1use crate::utils::{DateUtils, Envs};
2
3use async_trait::async_trait;
4use core::convert::From;
5use polars::export::chrono::NaiveDate;
6use polars::frame::row::Row;
7use polars::frame::DataFrame;
8use polars::io::{SerReader, SerWriter};
9use polars::prelude::{CsvReader, CsvWriter, JsonFormat, JsonReader, Schema};
10use reqwest::Request;
11use serde::{Deserialize, Serialize};
12use std::fs::File;
13use std::io::Cursor;
14use std::path::Path;
15use utils::IoUtils;
16
17pub mod cffex;
18pub mod const_vars;
19pub mod sina;
20pub mod utils;
21
22///
23/// enum 数据驱动
24/// trait 模式驱动
25///
26/// 数据服务,通过数据源和实现的数据类型增加获取数据的能力
27///
28#[derive(Debug, Clone, Default)]
29pub struct DataResult<T> {
30    pub data_id: Option<String>,
31    pub data: Option<T>,
32}
33
34///
35/// HLOC
36///
37#[derive(Debug, Deserialize, Serialize)]
38pub struct StockData {
39    pub symbol: String,
40    pub date: String,
41    pub low: f64,
42    pub close: f64,
43    pub open: f64,
44    pub high: f64,
45    pub volume: f64,
46}
47
48///
49/// 缓存结果,避免频繁调用导致ip被封
50///
51pub trait ResultCached<T> {
52    ///
53    /// 判断缓存是否存在
54    ///
55    fn is_cached(&self) -> bool;
56
57    ///
58    /// 缓存数据
59    ///
60    fn cache(&self);
61
62    ///
63    /// 加载缓存
64    ///
65    fn load(&self, schema: Option<Schema>) -> Result<DataResult<T>, anyhow::Error>;
66}
67
68///
69/// 实时行情
70///
71#[async_trait]
72pub trait RealTimeData {
73    ///
74    /// 实时行情
75    ///
76    async fn real_time_data(&self) -> Result<DataResult<DataFrame>, anyhow::Error>;
77
78    ///
79    /// 加载缓存时schema信息
80    ///
81    fn load_cached_schema(&self) -> Option<Schema>;
82}
83
84///
85/// data result 格式化处理
86///
87pub trait DataResultFormat {
88    ///
89    /// 对提供的数据进行处理生成DataFrame
90    ///
91    fn to_dataframe(&self, source: Option<String>) -> anyhow::Result<DataResult<DataFrame>>;
92
93    ///
94    /// 列别名
95    ///
96    fn col_alias(&self) -> Option<Vec<(&str, &str)>>;
97
98    ///
99    /// 格式化程序,包括列名重命名及加载缓存时schema信息
100    ///
101    fn format(&self, data_result_format: Option<DataFrame>) -> DataResult<DataFrame>;
102}
103
104///
105/// http data source
106///
107pub trait HttpSource {
108    ///
109    /// 构造请求
110    ///
111    fn request(&self) -> Request;
112
113    ///
114    /// id 生产策略
115    ///
116    fn id(&self) -> String;
117}
118
119///
120/// 历史行情
121///
122#[async_trait]
123pub trait HistoryData {
124    ///
125    /// 日行情
126    ///
127    async fn history_daily(
128        self,
129        market: &str,
130        symbol: &str,
131        start: NaiveDate,
132        end: NaiveDate,
133    ) -> Result<DataResult<DataFrame>, anyhow::Error>;
134}
135
136impl ResultCached<DataFrame> for DataResult<DataFrame> {
137    fn is_cached(&self) -> bool {
138        match &self.data_id {
139            None => false,
140            Some(id) => Path::new(&DataResult::cache_file_name(id)).exists(),
141        }
142    }
143
144    fn cache(&self) {
145        match &self.data_id {
146            None => {
147                tracing::warn!("data_id 为空,缓存文件失败");
148            }
149            Some(id) => {
150                let cache_file = DataResult::cache_file_name(id);
151                let file_result = File::create(&cache_file);
152                match file_result {
153                    Ok(mut file) => {
154                        CsvWriter::new(&mut file)
155                            .has_header(true)
156                            .finish(&mut self.data.clone().unwrap())
157                            .expect("缓存数据存储失败");
158                    }
159                    Err(e) => {
160                        tracing::warn!("{}缓存文件创建失败{}", &cache_file, e);
161                    }
162                }
163            }
164        }
165    }
166
167    fn load(&self, schema_opt: Option<Schema>) -> Result<DataResult<DataFrame>, anyhow::Error> {
168        match &self.data_id {
169            None => {
170                tracing::warn!("data_id 为空,加载缓存文件失败");
171                Ok(DataResult::empty())
172            }
173            Some(id) => {
174                let cache_file = DataResult::cache_file_name(id);
175                tracing::debug!("load file path:{:?}", &cache_file);
176
177                let data_frame_result = CsvReader::from_path(&cache_file);
178
179                match data_frame_result {
180                    Ok(csv_file) => {
181                        if let Some(schema) = schema_opt {
182                            let data_frame = csv_file
183                                .has_header(true)
184                                .with_schema(&schema)
185                                .with_parse_dates(true)
186                                .finish()
187                                .expect("TODO: panic message");
188
189                            return Ok(DataResult {
190                                data_id: Some(id.clone()),
191                                data: Some(data_frame.clone()),
192                            });
193                        } else {
194                            tracing::warn!(
195                                "load的schema为None,缓存数据可能加载错误或为空,请提供!!!"
196                            )
197                        }
198
199                        Ok(DataResult::empty())
200                    }
201                    Err(e) => {
202                        tracing::warn!("加载缓存文件{}, 解析失败:{}", &cache_file, e);
203                        Ok(DataResult::empty())
204                    }
205                }
206            }
207        }
208    }
209}
210
211impl DataResult<DataFrame> {
212    pub fn new(data_id: String, data_frame: DataFrame) -> DataResult<DataFrame> {
213        DataResult {
214            data_id: Some(data_id),
215            data: Some(data_frame),
216        }
217    }
218    fn empty() -> DataResult<DataFrame> {
219        DataResult::<DataFrame> {
220            data_id: None,
221            data: Some(DataFrame::empty()),
222        }
223    }
224
225    fn cache_file_name(data_id: &String) -> String {
226        let cache_temp_home = Envs::cache_temp_home();
227        let path = Path::new(&cache_temp_home);
228        if !&path.try_exists().ok().unwrap() && IoUtils::create_dir_recursive(path).is_err() {
229            tracing::warn!("{} 缓存目录创建失败", &cache_temp_home);
230        }
231
232        format_args!(
233            "{}/{}-{}{}",
234            cache_temp_home,
235            &data_id,
236            utils::DateUtils::now_fmt_ymd(),
237            ".csv"
238        )
239        .to_string()
240    }
241}
242
243impl From<Row<'_>> for StockData {
244    fn from(value: Row) -> Self {
245        let data = value.0;
246        StockData {
247            symbol: data[0].get_str().unwrap().to_string(),
248            date: DateUtils::now_fmt_ymd(),
249            low: data[6].to_string().parse().unwrap(),
250            close: data[2].to_string().parse().unwrap(),
251            open: data[7].to_string().parse().unwrap(),
252            high: data[5].to_string().parse().unwrap(),
253            volume: data[3].to_string().parse().unwrap(),
254        }
255    }
256}
257
258impl From<String> for DataResult<DataFrame> {
259    fn from(value: String) -> Self {
260        tracing::debug!("to DataResult from:{}", value);
261
262        let file = Cursor::new(value);
263        let df = JsonReader::new(file)
264            .with_json_format(JsonFormat::Json)
265            .infer_schema_len(Some(1024))
266            .with_batch_size(10)
267            .finish()
268            .unwrap();
269
270        DataResult::<DataFrame>::new("".to_string(), df)
271    }
272}