commodity_exchange_zh/util/
mod.rs

1use crate::{dce, Result};
2use bytesize::ByteSize;
3use regex::Regex;
4use serde::{Deserialize, Deserializer};
5use simplelog::{
6    ColorChoice, Config, ConfigBuilder, LevelFilter, SimpleLogger, TermLogger, TerminalMode,
7};
8use std::{
9    borrow::Cow,
10    fs::File,
11    io::{self, Cursor, ErrorKind, Write},
12    path::{Path, PathBuf},
13    sync::OnceLock,
14};
15use time::{format_description::FormatItem, macros::format_description, Date, OffsetDateTime};
16
17pub mod clickhouse;
18
19/// 开启日志
20pub fn init_log() -> Result<()> {
21    let level = std::env::var("LOG").map_or_else(
22        |_| LevelFilter::Info,
23        |l| l.parse().unwrap_or(LevelFilter::Off),
24    );
25    let mut config = ConfigBuilder::new();
26    config.set_time_offset(time::UtcOffset::from_hms(8, 0, 0)?);
27    TermLogger::init(
28        level,
29        config.build(),
30        TerminalMode::Mixed,
31        ColorChoice::Auto,
32    )?;
33    Ok(())
34}
35
36/// 测试函数的日志
37#[doc(hidden)]
38pub fn init_test_log() -> &'static Init {
39    let level = std::env::var("TEST_LOG").map_or_else(
40        |_| LevelFilter::Off,
41        |l| l.parse().unwrap_or(LevelFilter::Off),
42    );
43    if SimpleLogger::init(level, Config::default()).is_err() {
44        error!("日志开启失败,或许已经设置了日志");
45    }
46    init_data()
47}
48
49#[cfg(feature = "tabled")]
50pub fn display_option<T: std::fmt::Display>(t: &Option<T>) -> String {
51    match t {
52        Some(val) => val.to_string(),
53        None => String::new(),
54    }
55}
56
57pub struct Init {
58    pub cache_dir: PathBuf,
59    pub regex_czce: Regex,
60    pub this_year: u16,
61    pub links_dce: dce::DownloadLinks,
62}
63
64pub fn init_data() -> &'static Init {
65    static DATA: OnceLock<Init> = OnceLock::new();
66    DATA.get_or_init(|| Init {
67        cache_dir: cache_dir().unwrap(),
68        regex_czce: Regex::new(",| ").unwrap(),
69        this_year: OffsetDateTime::now_utc()
70            .to_offset(time::macros::offset!(+8))
71            .year()
72            .try_into()
73            .unwrap(),
74        links_dce: dce::DownloadLinks::new_static().unwrap(),
75    })
76}
77
78pub type Response = Result<Cursor<Vec<u8>>>;
79
80pub fn fetch(url: &str) -> Response {
81    let mut buf = Vec::with_capacity(1024 * 1024 * 4);
82    ureq::get(url).call()?.into_reader().read_to_end(&mut buf)?;
83    info!("{url} 获取的字节数:{}", ByteSize(buf.len() as u64));
84    Ok(Cursor::new(buf))
85}
86
87pub fn parse_date_czce<'de, D: Deserializer<'de>>(d: D) -> Result<Date, D::Error> {
88    const FMT: &[FormatItem<'static>] = format_description!("[year]-[month]-[day]");
89    let s = <&str>::deserialize(d)?;
90    Ok(Date::parse(s, FMT)
91        .map_err(|err| format!("{s:?} 无法解析成日期:{err:?}"))
92        .unwrap())
93}
94
95pub fn parse_option_f32<'de, D: Deserializer<'de>>(d: D) -> Result<Option<f32>, D::Error> {
96    let s = <&str>::deserialize(d)?;
97    if s.is_empty() {
98        Ok(None)
99    } else {
100        let float = s
101            .parse()
102            .map_err(|err| format!("{s:?} 无法解析为 f32:{err:?}"))
103            .unwrap();
104        Ok(Some(float))
105    }
106}
107
108pub fn parse_u32_from_f32<'de, D: Deserializer<'de>>(d: D) -> Result<u32, D::Error> {
109    let s = <&str>::deserialize(d)?;
110    let float: f32 = s
111        .parse()
112        .map_err(|_| format!("{s} 无法解析为 f32"))
113        .unwrap();
114    if float < 0.0 {
115        panic!("{s} 无法从 f32 转化为 u32")
116    } else {
117        Ok(float as _)
118    }
119}
120
121pub enum Encoding {
122    UTF8,
123    GBK,
124}
125
126pub fn fetch_zip(
127    url: &str,
128    mut handle_unzipped: impl FnMut(Vec<u8>, String) -> Result<()>,
129) -> Result<()> {
130    let mut fetched = fetch(url)?;
131    let mut zipped = match zip::ZipArchive::new(&mut fetched) {
132        Ok(data) => data,
133        Err(err) => {
134            let file = init_data().cache_dir.join("failed");
135            File::create(&file)?.write_all(fetched.get_ref())?;
136            bail!(
137                "无法解析 zip 文件,下载的内容保存在 {}:{err:?}",
138                file.display()
139            );
140        }
141    };
142    for i in 0..zipped.len() {
143        let mut unzipped = zipped.by_index(i)?;
144        if unzipped.is_file() {
145            let unzipped_path = unzipped
146                .enclosed_name()
147                .ok_or_else(|| eyre!("`{}` 无法转成 &Path", unzipped.name()))?;
148            let size = unzipped.size();
149            let unzipped_path_display = unzipped_path.display().to_string();
150            info!(
151                "{url} 获取的第 {i} 个文件:{unzipped_path_display} ({} => {})",
152                ByteSize(unzipped.compressed_size()),
153                ByteSize(size),
154            );
155            let file_name = unzipped_path
156                .file_name()
157                .and_then(|fname| Some(fname.to_str()?.to_owned()))
158                .ok_or_else(|| eyre!("无法从 {unzipped_path:?} 中获取文件名"))?;
159            let mut buf = Vec::with_capacity(size as usize);
160            io::copy(&mut unzipped, &mut buf)?;
161            handle_unzipped(buf, file_name)?;
162        } else {
163            bail!("{} 还未实现解压成文件夹", unzipped.name());
164        }
165    }
166    Ok(())
167}
168
169/// 处理编码
170pub fn read_txt<'a>(buf: &'a [u8], src: &str) -> Result<(Cow<'a, str>, Encoding)> {
171    let content_encoding = match std::str::from_utf8(buf) {
172        Ok(s) => (s.into(), Encoding::UTF8),
173        Err(_) => {
174            let gbk = encoding_rs::GBK;
175            info!("{src} 不是 UTF8 编码的,尝试使用 GBK 解码");
176            let (cow, encoding, err) = gbk.decode(buf);
177            if err {
178                bail!("{src} 不是 GBK 编码的,需要手动确认编码");
179            } else if encoding != gbk {
180                bail!("{src} GBK/{encoding:?} 解码失败");
181            }
182            (cow, Encoding::GBK)
183        }
184    };
185    Ok(content_encoding)
186}
187
188/// 缓存目录
189pub fn cache_dir() -> Result<PathBuf> {
190    const CACHE: &str = "cache";
191    let dir = PathBuf::from(CACHE);
192    match std::fs::create_dir(&dir) {
193        Ok(_) => debug!("成功创建 {CACHE} 目录"),
194        Err(err) => {
195            if matches!(err.kind(), ErrorKind::AlreadyExists) {
196                debug!("{CACHE} 已存在");
197            } else {
198                bail!("无法创建 {CACHE},因为 {err:?}");
199            }
200        }
201    }
202    Ok(dir)
203}
204
205pub fn save_csv(bytes: &[u8], filename: impl AsRef<Path>) -> Result<PathBuf> {
206    let fname = filename.as_ref();
207    let mut path = init_data().cache_dir.join(fname);
208    if !path.set_extension("csv") {
209        error!("{} 无法设置 csv 文件名后缀", fname.display());
210    }
211    File::create(&path)?.write_all(bytes)?;
212    info!(
213        "{} 已被写入 ({})",
214        path.display(),
215        ByteSize(bytes.len() as _)
216    );
217    Ok(path)
218}
219
220pub fn save_to_csv_and_clickhouse<F, G>(csv: F, ch: G) -> Result<()>
221where
222    F: Send + FnOnce() -> Result<PathBuf>,
223    G: Send + FnOnce() -> Result<()>,
224{
225    std::thread::scope(|s| {
226        let task1 = s.spawn(csv);
227        let task2 = s.spawn(ch);
228        match task1.join() {
229            Ok(res) => _ = res?,
230            Err(err) => bail!("save_csv 运行失败:{err:?}"),
231        }
232        match task2.join() {
233            Ok(res) => res?,
234            Err(err) => bail!("保存到 clickhouse 运行失败:{err:?}"),
235        }
236        Ok(())
237    })
238}