commodity_exchange_zh/dce/
mod.rs

1use crate::{util, Result, Str};
2use bincode::{Decode, Encode};
3use calamine::{DataType, Reader};
4use color_eyre::eyre::{Context, ContextCompat};
5use indexmap::{Equivalent, IndexMap};
6use serde::{Deserialize, Serialize};
7use std::io;
8use time::Date;
9
10mod parse;
11pub use parse::parse_download_links;
12mod select;
13pub use select::select;
14
15pub static DOWNLOAD_LINKS: &[u8] = include_bytes!("../../tests/dce.bincode");
16pub const URL_PREFIX: &str = "http://www.dce.com.cn";
17
18#[derive(Debug, Decode, Encode, PartialEq, Eq)]
19pub struct DownloadLinks(#[bincode(with_serde)] IndexMap<Key, String>);
20
21impl DownloadLinks {
22    pub fn new_static() -> Result<DownloadLinks> {
23        let d = bincode::decode_from_slice::<DownloadLinks, _>(
24            DOWNLOAD_LINKS,
25            bincode::config::standard(),
26        )?;
27        Ok(d.0)
28    }
29    pub fn iter(&self) -> impl Iterator<Item = (&Key, &String)> {
30        self.0.iter()
31    }
32    #[allow(clippy::len_without_is_empty)]
33    pub fn len(&self) -> usize {
34        self.0.len()
35    }
36}
37
38#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
39#[cfg_attr(feature = "tabled", derive(tabled::Tabled))]
40pub struct Key {
41    pub year: u16,
42    pub name: Str,
43}
44
45impl std::fmt::Display for Key {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        let Key { year, name } = self;
48        write!(f, "({year}, {name})")
49    }
50}
51
52impl Equivalent<Key> for (u16, &str) {
53    fn equivalent(&self, key: &Key) -> bool {
54        self.0 == key.year && self.1 == key.name
55    }
56}
57
58pub fn get_url(year: u16, name: &str) -> Result<String> {
59    let index_map = &util::init_data().links_dce.0;
60    let postfix = index_map
61        .get(&(year, name))
62        .with_context(|| format!("无法找到 {year} 年 {name} 品种的下载链接"))?;
63    Ok(format!("{URL_PREFIX}{postfix}"))
64}
65
66/// 读取 xlsx 文件,并处理解析过的每行数据
67pub fn read_xlsx<R: io::Read + io::Seek>(
68    mut wb: calamine::Xlsx<R>,
69    mut handle: impl FnMut(Data) -> Result<()>,
70) -> Result<()> {
71    let sheet = match wb.worksheet_range_at(0) {
72        Some(Ok(sheet)) => sheet,
73        Some(Err(err)) => bail!("无法读取第 0 个表,因为 {err:?}"),
74        None => bail!("无法读取第 0 个表"),
75    };
76    let mut rows = sheet.rows();
77    let header = rows.next().context("无法读取第一行")?;
78    let pos = parse::parse_xslx_header(header)?;
79    for row in rows {
80        handle(Data::new(row, &pos)?)?;
81    }
82    Ok(())
83}
84
85pub fn run(year: u16, name: &str) -> Result<()> {
86    let link = get_url(year, name)?;
87    let xlsx = if link.ends_with(".xlsx") || link.ends_with(".csv") {
88        // xxx.csv 其实也是 xlsx 文件 :(
89        util::fetch(&link)?
90    } else if link.ends_with(".zip") {
91        // TODO: zip 压缩的是 csv 文件(文件名乱码),所以需要解析 (GBK 编码)
92        // NOTE: zip 文件只在 2017 年及其之前提供,并且无法通过直接的 get 下载到
93        //       (貌似最重要的是请求时带上 cookies,但它有时效性,很快失效,因此暂时不要 .zip 数据)
94        // tests/snapshots/data__dce-downloadlink.snap
95        // let mut v = Vec::with_capacity(1);
96        util::fetch_zip(&link, |raw, fname| {
97            let (csv, _) = util::read_txt(&raw, &fname)?;
98            for line in csv.lines().take(3) {
99                info!("{line}");
100            }
101            // v.push((raw, fname));
102            Ok(())
103        })?;
104        return Ok(());
105        // ensure!(
106        //     v.len() == 1,
107        //     "无法处理 {link} 内的多文件:{:?}",
108        //     v.iter().map(|v| &v.1).collect::<Vec<_>>()
109        // );
110        // Cursor::new(v.remove(0).0)
111    } else {
112        bail!("暂时无法处理 {link},因为只支持 xlsx 或者 zip 文件");
113    };
114    let len = xlsx.get_ref().len();
115    let mut writer = csv::WriterBuilder::new()
116        .has_headers(false)
117        .buffer_capacity(len)
118        .from_writer(Vec::with_capacity(len));
119    read_xlsx(calamine::Xlsx::new(xlsx)?, |data| {
120        writer.serialize(&data)?;
121        Ok(())
122    })?;
123    writer.flush()?;
124    let fname = format!("dce-{year}-{name}.csv");
125    let bytes = writer.get_ref();
126    util::save_to_csv_and_clickhouse(
127        || util::save_csv(bytes, &fname),
128        || {
129            util::clickhouse::execute(include_str!("../sql/dce.sql"))?;
130            const TABLE: &str = "qihuo.dce";
131            util::clickhouse::insert_with_count_reported(TABLE, writer.get_ref())
132        },
133    )?;
134    Ok(())
135}
136
137#[derive(Debug, Serialize)]
138#[cfg_attr(feature = "tabled", derive(tabled::Tabled))]
139pub struct Data {
140    /// 合约代码
141    pub code: Str,
142    /// 交易日期
143    pub date: Date,
144    /// 昨结算(此列不必要:也就是上个交易日的今结算)
145    pub prev: f32,
146    /// 今开盘
147    pub open: f32,
148    /// 最高价
149    pub high: f32,
150    /// 最低价
151    pub low: f32,
152    /// 今收盘
153    pub close: f32,
154    /// 今结算(与昨结算重合)
155    pub settle: f32,
156    /// 涨跌1(此列不必要:因为它 = close - prev)
157    pub zd1: f32,
158    /// 涨跌2(此列不必要:因为它 = settle - prev)
159    pub zd2: f32,
160    /// 成交量(双边)
161    pub vol: u32,
162    /// 交易额(疑似单边、单位元,且貌似约等于 settle*vol/2*品种杠杆)
163    pub amount: u32,
164    /// 持仓量(双边)
165    pub position: u32,
166}
167
168impl Data {
169    pub fn new(row: &[DataType], pos: &[usize]) -> Result<Data> {
170        use parse::{as_date, as_f32, as_str, as_u32, LEN};
171
172        ensure!(pos.len() == LEN, "xlsx 的表头有效列不足 {LEN}:{pos:?}");
173        let err = |n: usize| format!("{row:?} 无法获取到第 {n} 个单元格数据");
174        Ok(Data {
175            code: as_str(row.get(pos[0]).with_context(|| err(0))?)?,
176            date: as_date(row.get(pos[1]).with_context(|| err(1))?)?,
177            prev: as_f32(row.get(pos[2]).with_context(|| err(2))?)?,
178            open: as_f32(row.get(pos[3]).with_context(|| err(3))?)?,
179            high: as_f32(row.get(pos[4]).with_context(|| err(4))?)?,
180            low: as_f32(row.get(pos[5]).with_context(|| err(5))?)?,
181            close: as_f32(row.get(pos[6]).with_context(|| err(6))?)?,
182            settle: as_f32(row.get(pos[7]).with_context(|| err(7))?)?,
183            zd1: as_f32(row.get(pos[8]).with_context(|| err(8))?)?,
184            zd2: as_f32(row.get(pos[9]).with_context(|| err(9))?)?,
185            vol: as_u32(row.get(pos[10]).with_context(|| err(10))?)?,
186            amount: as_u32(row.get(pos[11]).with_context(|| err(11))?)?,
187            position: as_u32(row.get(pos[12]).with_context(|| err(12))?)?,
188        })
189    }
190}