1use csv::StringRecord;
2use std::error::Error;
3use std::fs::File;
4use std::env;
5
6use rust_xlsxwriter::{Workbook, XlsxError};
7use std::io::{BufReader, BufRead};
8use csv::ReaderBuilder;
9use chrono::Local;
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14use std::path::{Path, PathBuf};
15
16pub fn read_columns_headers(path: &str) -> Result<StringRecord, Box<dyn Error>> {
26 let file = File::open(path)?;
28
29 let mut rdr = csv::Reader::from_reader(file);
31
32 let headers = rdr.headers()?.clone();
34
35 Ok(headers)
37}
38
39const ROWS_PER_FILE: usize = 1_000_000;
41
42pub fn convert_from_path(file_path: &str, output_dir: &str, cancel_flag: &Arc<AtomicBool>) -> Result<(), Box<dyn Error>> {
44 println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
45
46 std::fs::create_dir_all(output_dir)?;
48
49 let path = Path::new(file_path);
52 let base_name = path
53 .file_stem()
54 .and_then(|os| os.to_str())
55 .unwrap_or("not_found_file");
56
57 if cancel_flag.load(Ordering::SeqCst) {
59 println!("取消操作:在计数阶段");
60 return Err("Operation cancelled".into());
61 }
62 let total_rows = count_csv_rows(file_path)?;
63 println!("文件总行数(不含表头):{}", total_rows);
64 let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
65 println!("预计生成文件数:{}", total_files);
66
67 let file = File::open(file_path)?;
69 let buf_reader = BufReader::with_capacity(8 * 1024 * 1024, file);
70 let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
71
72 let headers: Vec<String> = reader
74 .headers()?
75 .iter()
76 .map(|s| s.to_string())
77 .collect();
78
79 let mut file_index = 1;
80 let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(ROWS_PER_FILE);
81
82 for result in reader.records() {
84 if cancel_flag.load(Ordering::SeqCst) {
86 println!("检测到取消信号,提前退出转换过程");
87 return Err("Operation cancelled".into());
88 }
89
90 let record: StringRecord = result?;
91 let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
92 current_chunk.push(row_data);
93
94 if current_chunk.len() == ROWS_PER_FILE {
96 if cancel_flag.load(Ordering::SeqCst) {
98 println!("检测到取消信号,在写入文件前取消");
99 return Err("Operation cancelled".into());
100 }
101 let output_path = Path::new(output_dir)
105 .join(format!("{}-part{}.xlsx", base_name, file_index));
106 println!("写入文件:{}", output_path.to_string_lossy());
107 write_to_excel(&output_path.to_string_lossy(), &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
108 current_chunk.clear();
109 file_index += 1;
110 }
111 }
112
113 if !current_chunk.is_empty() {
115 if cancel_flag.load(Ordering::SeqCst) {
116 println!("检测到取消信号,在写入剩余文件前取消");
117 return Err("Operation cancelled".into());
118 }
119 let output_path = Path::new(output_dir)
122 .join(format!("{}-part{}.xlsx", base_name, file_index));
123 println!("写入文件:{}", output_path.to_string_lossy());
124 write_to_excel(&output_path.to_string_lossy(), &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
125 }
126
127 println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
128 Ok(())
129}
130
131fn write_to_excel(
133 file_path: &str,
134 headers: &[String],
135 data: &[Vec<String>],
136 cancel_flag: Arc<AtomicBool>
137) -> Result<(), Box<dyn Error>> {
138 if let Some(parent) = Path::new(file_path).parent() {
140 std::fs::create_dir_all(parent)?;
141 }
142
143 let mut workbook = Workbook::new();
145 let worksheet = workbook.add_worksheet_with_constant_memory();
146
147 if cancel_flag.load(Ordering::SeqCst) {
149 println!("取消信号:在写入表头前取消");
150 return Err("Operation cancelled".into());
152 }
153
154 worksheet.write_row(0, 0, headers)?;
156
157 for (row_index, row) in data.iter().enumerate() {
159 if cancel_flag.load(Ordering::SeqCst) {
160 println!("取消信号:在写入第 {} 行时取消", row_index + 1);
161 return Err("Operation cancelled".into());
162 }
163 worksheet.write_row((row_index + 1) as u32, 0, row)?;
164 }
165 workbook.save(file_path)?;
166 Ok(())
167}
168
169fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
171 let file = File::open(file_path)?;
172 let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
173 let total_rows = reader.lines().count().saturating_sub(1);
175 Ok(total_rows)
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use std::io::Write;
182 use tempfile::NamedTempFile;
183
184 #[test]
204 fn test_convert_from_path() {
205 let file_path = "D:\\oracle-export-test\\csv\\aaa.csv";
208 let output_dir = "D:\\oracle-export-test\\xlsx";
209 let flag = Arc::new(AtomicBool::new(false));
210 convert_from_path(file_path, output_dir, &flag);
211 }
212}