cs_csv/lib.rs
1use csv::StringRecord;
2use std::error::Error;
3use std::fs::File;
4use std::env;
5
6use rust_xlsxwriter::{Workbook, XlsxError};
7use std::io::{BufReader, BufRead};
8use csv::ReaderBuilder;
9use chrono::Local;
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14
15
16/// 读取 CSV 文件的表头(假设第一行为表头)
17///
18/// # 参数
19///
20/// * `path` - CSV 文件的路径
21///
22/// # 返回
23///
24/// 返回一个 `Result<StringRecord, Box<dyn Error>>`,其中 `StringRecord` 包含 CSV 表头。
25pub fn read_columns_headers(path: &str) -> Result<StringRecord, Box<dyn Error>> {
26 // 打开 CSV 文件
27 let file = File::open(path)?;
28
29 // 创建 CSV 读取器
30 let mut rdr = csv::Reader::from_reader(file);
31
32 // 读取 CSV 的表头(假定第一行是表头)
33 let headers = rdr.headers()?.clone();
34
35 // 返回 headers
36 Ok(headers)
37}
38
39
40pub fn add(left: u64, right: u64) -> u64 {
41 left + right
42}
43
44
45// const ROWS_PER_FILE: usize = 1_000_000;
46
47
48// pub fn convert_from_path(file_path: &str) -> Result<(), Box<dyn Error>> {
49// println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
50// let file_name = file_path.split('.').next().unwrap_or("not_found_file");
51
52// let total_rows = count_csv_rows(file_path)?;
53// println!("文件总行数(不含表头):{}", total_rows);
54// let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
55// println!("预计生成文件数:{}", total_files);
56
57// let file = File::open(file_path)?;
58// // 8MB的缓冲区
59// let buf_reader = BufReader::with_capacity(8*1024*1024, file);
60
61// let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
62// // let mut reader = ReaderBuilder::new().has_headers(true).from_reader(BufReader::new(file));
63
64// // let total_rows = reader.lines().count().saturating_sub(1);
65// // println!("Total rows: {}", total_rows);
66// let headers: Vec<String> = reader.headers()?.iter().map(|s| s.to_string()).collect();
67
68// let mut file_index = 1;
69// let mut current_chunk = Vec::with_capacity(ROWS_PER_FILE);
70
71// // let mut cnt = 0;
72
73// for result in reader.records() {
74// let record = result?;
75// let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
76// current_chunk.push(row_data);
77
78// if current_chunk.len() == ROWS_PER_FILE {
79// // cnt += 1;
80// // println!("生成任务批次: {}", cnt);
81// let file_name = format!("{}-part{}.xlsx", file_name, file_index);
82// write_to_excel(&file_name, &headers, ¤t_chunk)?;
83// current_chunk.clear();
84// file_index += 1;
85
86// }
87// }
88
89// if !current_chunk.is_empty() {
90// // cnt += 1;
91// // println!("生成任务批次: {}", cnt);
92// let file_name = format!("{}-part{}.xlsx", file_name, file_index);
93// write_to_excel(&file_name, &headers, ¤t_chunk)?;
94
95// }
96
97// println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
98// // println!("所有文件生成完成,总用时:{}秒", start_time.elapsed()?.as_secs());
99// Ok(())
100// }
101
102
103// fn write_to_excel(
104// file_name: &str,
105// headers: &[String],
106// data: &[Vec<String>],
107// ) -> Result<(), XlsxError> {
108// // println!("Time: {}, Saving file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
109// let mut workbook = Workbook::new();
110
111// // let worksheet = workbook.add_worksheet();
112
113// let worksheet = workbook.add_worksheet_with_constant_memory();
114
115// worksheet.write_row(0, 0, headers)?;
116
117// for (row_index, row) in data.iter().enumerate() {
118// worksheet.write_row((row_index + 1) as u32, 0, row)?;
119// // worksheet.write_multi_row(row, col, data)?;
120// }
121// workbook.save(file_name)?;
122// // println!("Time: {}, Saved file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
123// Ok(())
124// }
125
126// fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
127// let file = File::open(file_path)?;
128// // let reader = BufReader::new(file);
129// let reader = BufReader::with_capacity(8*1024*1024, file);
130// let total_rows = reader.lines().count().saturating_sub(1); // 使用 saturating_sub 以避免下溢
131// Ok(total_rows)
132// }
133
134
135// 假设这是你已有的 XlsxError 类型
136// #[derive(Debug)]
137// pub struct XlsxError(String);
138// impl std::fmt::Display for XlsxError {
139// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
140// write!(f, "XlsxError: {}", self.0)
141// }
142// }
143// impl Error for XlsxError {}
144// impl From<String> for XlsxError {
145// fn from(err: String) -> XlsxError {
146// XlsxError(err)
147// }
148// }
149
150
151
152// 假设每个 Excel 文件的最大行数是一个常量
153const ROWS_PER_FILE: usize = 1_000_000;
154
155// 转化函数,增加 cancel_flag 参数,用于检测取消信号
156pub fn convert_from_path(file_path: &str, cancel_flag: &Arc<AtomicBool>) -> Result<(), Box<dyn Error>> {
157 println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
158
159 // 从文件路径获取基本文件名
160 let base_name = file_path.split('.').next().unwrap_or("not_found_file");
161
162 // 在开始计数前检测取消标志
163 if cancel_flag.load(Ordering::SeqCst) {
164 println!("取消操作:在计数阶段");
165 return Err("Operation cancelled".into());
166 }
167 let total_rows = count_csv_rows(file_path)?;
168 println!("文件总行数(不含表头):{}", total_rows);
169 let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
170 println!("预计生成文件数:{}", total_files);
171
172 // 打开 CSV 文件,并使用 8MB 缓冲区读取
173 let file = File::open(file_path)?;
174 let buf_reader = BufReader::with_capacity(8 * 1024 * 1024, file);
175 let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
176
177 // 获取表头
178 let headers: Vec<String> = reader
179 .headers()?
180 .iter()
181 .map(|s| s.to_string())
182 .collect();
183
184 let mut file_index = 1;
185 let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(ROWS_PER_FILE);
186
187 // 逐条处理 CSV 记录
188 for result in reader.records() {
189 // 每次读取记录前检查取消信号
190 if cancel_flag.load(Ordering::SeqCst) {
191 println!("检测到取消信号,提前退出转换过程");
192 return Err("Operation cancelled".into());
193 }
194
195 let record: StringRecord = result?;
196 let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
197 current_chunk.push(row_data);
198
199 // 当累计达到设定行数时,写入一个 Excel 文件
200 if current_chunk.len() == ROWS_PER_FILE {
201 // 写入前再次检查取消信号
202 if cancel_flag.load(Ordering::SeqCst) {
203 println!("检测到取消信号,在写入文件前取消");
204 return Err("Operation cancelled".into());
205 }
206 let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
207 write_to_excel(&output_file_name, &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
208 current_chunk.clear();
209 file_index += 1;
210 }
211 }
212
213 // 如果剩余未满一批,则写入最后一个文件之前检查取消
214 if !current_chunk.is_empty() {
215 if cancel_flag.load(Ordering::SeqCst) {
216 println!("检测到取消信号,在写入剩余文件前取消");
217 return Err("Operation cancelled".into());
218 }
219 let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
220 write_to_excel(&output_file_name, &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
221 }
222
223 println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
224 Ok(())
225}
226
227// 将指定数据写入 Excel 文件的函数(这里暂不增加内部取消检查,如有需要可在循环中添加)
228fn write_to_excel(
229 file_name: &str,
230 headers: &[String],
231 data: &[Vec<String>],
232 cancel_flag: Arc<AtomicBool>
233) -> Result<(), Box<dyn Error>> {
234 // 创建 Excel 工作簿对象(此处以 Workbook 为例,你可以使用相应的 Excel 写入库)
235 let mut workbook = Workbook::new();
236 let worksheet = workbook.add_worksheet_with_constant_memory();
237
238 // 写入表头前检查是否被取消
239 if cancel_flag.load(Ordering::SeqCst) {
240 println!("取消信号:在写入表头前取消");
241 // return Err(XlsxError("Operation cancelled".to_string()));
242 return Err("Operation cancelled".into());
243 }
244
245 // 写入表头
246 worksheet.write_row(0, 0, headers)?;
247
248 // 逐行写入数据
249 for (row_index, row) in data.iter().enumerate() {
250 if cancel_flag.load(Ordering::SeqCst) {
251 println!("取消信号:在写入第 {} 行时取消", row_index + 1);
252 return Err("Operation cancelled".into());
253 }
254 worksheet.write_row((row_index + 1) as u32, 0, row)?;
255 }
256 workbook.save(file_name)?;
257 Ok(())
258}
259
260// 计算 CSV 文件的行数(不包含表头)
261fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
262 let file = File::open(file_path)?;
263 let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
264 // 计算行数并减去表头
265 let total_rows = reader.lines().count().saturating_sub(1);
266 Ok(total_rows)
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use std::io::Write;
273 use tempfile::NamedTempFile;
274
275 #[test]
276 fn it_works() {
277 let result = add(2, 2);
278 assert_eq!(result, 4);
279 }
280
281
282 #[test]
283 fn test_read_columns_headers() {
284 // 创建一个临时 CSV 文件,写入测试数据
285 let mut temp_file = NamedTempFile::new().expect("创建临时文件失败");
286 writeln!(temp_file, "name,age,city").expect("写入测试数据失败");
287 writeln!(temp_file, "Alice,30,New York").expect("写入测试数据失败");
288
289 // 调用库函数
290 let file = "data.csv";
291 let headers = read_columns_headers(file);
292 // println!("headers, {:?}", headers);
293 println!("-------------------");
294 // let headers = read_columns_headers(temp_file.path().to_str().unwrap()).expect("读取 CSV 表头失败");
295
296 // 检查表头内容
297 let expected = csv::StringRecord::from(vec!["name", "age", "city"]);
298 // assert_eq!(headers, expected);
299 }
300
301 #[test]
302 fn test_convert_from_path() {
303 // 调用库函数
304 let file = "data.csv";
305 let flag = Arc::new(AtomicBool::new(false));
306 convert_from_path(file, &flag);
307 }
308}