commodity_exchange_zh/util/
mod.rs1use crate::{dce, Result};
2use bytesize::ByteSize;
3use regex::Regex;
4use serde::{Deserialize, Deserializer};
5use simplelog::{
6 ColorChoice, Config, ConfigBuilder, LevelFilter, SimpleLogger, TermLogger, TerminalMode,
7};
8use std::{
9 borrow::Cow,
10 fs::File,
11 io::{self, Cursor, ErrorKind, Write},
12 path::{Path, PathBuf},
13 sync::OnceLock,
14};
15use time::{format_description::FormatItem, macros::format_description, Date, OffsetDateTime};
16
17pub mod clickhouse;
18
19pub fn init_log() -> Result<()> {
21 let level = std::env::var("LOG").map_or_else(
22 |_| LevelFilter::Info,
23 |l| l.parse().unwrap_or(LevelFilter::Off),
24 );
25 let mut config = ConfigBuilder::new();
26 config.set_time_offset(time::UtcOffset::from_hms(8, 0, 0)?);
27 TermLogger::init(
28 level,
29 config.build(),
30 TerminalMode::Mixed,
31 ColorChoice::Auto,
32 )?;
33 Ok(())
34}
35
36#[doc(hidden)]
38pub fn init_test_log() -> &'static Init {
39 let level = std::env::var("TEST_LOG").map_or_else(
40 |_| LevelFilter::Off,
41 |l| l.parse().unwrap_or(LevelFilter::Off),
42 );
43 if SimpleLogger::init(level, Config::default()).is_err() {
44 error!("日志开启失败,或许已经设置了日志");
45 }
46 init_data()
47}
48
49#[cfg(feature = "tabled")]
50pub fn display_option<T: std::fmt::Display>(t: &Option<T>) -> String {
51 match t {
52 Some(val) => val.to_string(),
53 None => String::new(),
54 }
55}
56
57pub struct Init {
58 pub cache_dir: PathBuf,
59 pub regex_czce: Regex,
60 pub this_year: u16,
61 pub links_dce: dce::DownloadLinks,
62}
63
64pub fn init_data() -> &'static Init {
65 static DATA: OnceLock<Init> = OnceLock::new();
66 DATA.get_or_init(|| Init {
67 cache_dir: cache_dir().unwrap(),
68 regex_czce: Regex::new(",| ").unwrap(),
69 this_year: OffsetDateTime::now_utc()
70 .to_offset(time::macros::offset!(+8))
71 .year()
72 .try_into()
73 .unwrap(),
74 links_dce: dce::DownloadLinks::new_static().unwrap(),
75 })
76}
77
78pub type Response = Result<Cursor<Vec<u8>>>;
79
80pub fn fetch(url: &str) -> Response {
81 let mut buf = Vec::with_capacity(1024 * 1024 * 4);
82 ureq::get(url).call()?.into_reader().read_to_end(&mut buf)?;
83 info!("{url} 获取的字节数:{}", ByteSize(buf.len() as u64));
84 Ok(Cursor::new(buf))
85}
86
87pub fn parse_date_czce<'de, D: Deserializer<'de>>(d: D) -> Result<Date, D::Error> {
88 const FMT: &[FormatItem<'static>] = format_description!("[year]-[month]-[day]");
89 let s = <&str>::deserialize(d)?;
90 Ok(Date::parse(s, FMT)
91 .map_err(|err| format!("{s:?} 无法解析成日期:{err:?}"))
92 .unwrap())
93}
94
95pub fn parse_option_f32<'de, D: Deserializer<'de>>(d: D) -> Result<Option<f32>, D::Error> {
96 let s = <&str>::deserialize(d)?;
97 if s.is_empty() {
98 Ok(None)
99 } else {
100 let float = s
101 .parse()
102 .map_err(|err| format!("{s:?} 无法解析为 f32:{err:?}"))
103 .unwrap();
104 Ok(Some(float))
105 }
106}
107
108pub fn parse_u32_from_f32<'de, D: Deserializer<'de>>(d: D) -> Result<u32, D::Error> {
109 let s = <&str>::deserialize(d)?;
110 let float: f32 = s
111 .parse()
112 .map_err(|_| format!("{s} 无法解析为 f32"))
113 .unwrap();
114 if float < 0.0 {
115 panic!("{s} 无法从 f32 转化为 u32")
116 } else {
117 Ok(float as _)
118 }
119}
120
121pub enum Encoding {
122 UTF8,
123 GBK,
124}
125
126pub fn fetch_zip(
127 url: &str,
128 mut handle_unzipped: impl FnMut(Vec<u8>, String) -> Result<()>,
129) -> Result<()> {
130 let mut fetched = fetch(url)?;
131 let mut zipped = match zip::ZipArchive::new(&mut fetched) {
132 Ok(data) => data,
133 Err(err) => {
134 let file = init_data().cache_dir.join("failed");
135 File::create(&file)?.write_all(fetched.get_ref())?;
136 bail!(
137 "无法解析 zip 文件,下载的内容保存在 {}:{err:?}",
138 file.display()
139 );
140 }
141 };
142 for i in 0..zipped.len() {
143 let mut unzipped = zipped.by_index(i)?;
144 if unzipped.is_file() {
145 let unzipped_path = unzipped
146 .enclosed_name()
147 .ok_or_else(|| eyre!("`{}` 无法转成 &Path", unzipped.name()))?;
148 let size = unzipped.size();
149 let unzipped_path_display = unzipped_path.display().to_string();
150 info!(
151 "{url} 获取的第 {i} 个文件:{unzipped_path_display} ({} => {})",
152 ByteSize(unzipped.compressed_size()),
153 ByteSize(size),
154 );
155 let file_name = unzipped_path
156 .file_name()
157 .and_then(|fname| Some(fname.to_str()?.to_owned()))
158 .ok_or_else(|| eyre!("无法从 {unzipped_path:?} 中获取文件名"))?;
159 let mut buf = Vec::with_capacity(size as usize);
160 io::copy(&mut unzipped, &mut buf)?;
161 handle_unzipped(buf, file_name)?;
162 } else {
163 bail!("{} 还未实现解压成文件夹", unzipped.name());
164 }
165 }
166 Ok(())
167}
168
169pub fn read_txt<'a>(buf: &'a [u8], src: &str) -> Result<(Cow<'a, str>, Encoding)> {
171 let content_encoding = match std::str::from_utf8(buf) {
172 Ok(s) => (s.into(), Encoding::UTF8),
173 Err(_) => {
174 let gbk = encoding_rs::GBK;
175 info!("{src} 不是 UTF8 编码的,尝试使用 GBK 解码");
176 let (cow, encoding, err) = gbk.decode(buf);
177 if err {
178 bail!("{src} 不是 GBK 编码的,需要手动确认编码");
179 } else if encoding != gbk {
180 bail!("{src} GBK/{encoding:?} 解码失败");
181 }
182 (cow, Encoding::GBK)
183 }
184 };
185 Ok(content_encoding)
186}
187
188pub fn cache_dir() -> Result<PathBuf> {
190 const CACHE: &str = "cache";
191 let dir = PathBuf::from(CACHE);
192 match std::fs::create_dir(&dir) {
193 Ok(_) => debug!("成功创建 {CACHE} 目录"),
194 Err(err) => {
195 if matches!(err.kind(), ErrorKind::AlreadyExists) {
196 debug!("{CACHE} 已存在");
197 } else {
198 bail!("无法创建 {CACHE},因为 {err:?}");
199 }
200 }
201 }
202 Ok(dir)
203}
204
205pub fn save_csv(bytes: &[u8], filename: impl AsRef<Path>) -> Result<PathBuf> {
206 let fname = filename.as_ref();
207 let mut path = init_data().cache_dir.join(fname);
208 if !path.set_extension("csv") {
209 error!("{} 无法设置 csv 文件名后缀", fname.display());
210 }
211 File::create(&path)?.write_all(bytes)?;
212 info!(
213 "{} 已被写入 ({})",
214 path.display(),
215 ByteSize(bytes.len() as _)
216 );
217 Ok(path)
218}
219
220pub fn save_to_csv_and_clickhouse<F, G>(csv: F, ch: G) -> Result<()>
221where
222 F: Send + FnOnce() -> Result<PathBuf>,
223 G: Send + FnOnce() -> Result<()>,
224{
225 std::thread::scope(|s| {
226 let task1 = s.spawn(csv);
227 let task2 = s.spawn(ch);
228 match task1.join() {
229 Ok(res) => _ = res?,
230 Err(err) => bail!("save_csv 运行失败:{err:?}"),
231 }
232 match task2.join() {
233 Ok(res) => res?,
234 Err(err) => bail!("保存到 clickhouse 运行失败:{err:?}"),
235 }
236 Ok(())
237 })
238}