csv2xlsx_line/
lib.rs

1use csv::StringRecord;
2use std::error::Error;
3use std::fs::File;
4use std::env;
5
6use rust_xlsxwriter::{Workbook, XlsxError};
7use std::io::{BufReader, BufRead};
8use csv::ReaderBuilder;
9use chrono::Local;
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14use std::path::{Path, PathBuf};
15
16/// 读取 CSV 文件的表头(假设第一行为表头)
17///
18/// # 参数
19///
20/// * `path` - CSV 文件的路径
21///
22/// # 返回
23///
24/// 返回一个 `Result<StringRecord, Box<dyn Error>>`,其中 `StringRecord` 包含 CSV 表头。
25pub fn read_columns_headers(path: &str) -> Result<StringRecord, Box<dyn Error>> {
26    // 打开 CSV 文件
27    let file = File::open(path)?;
28    
29    // 创建 CSV 读取器
30    let mut rdr = csv::Reader::from_reader(file);
31    
32    // 读取 CSV 的表头(假定第一行是表头)
33    let headers = rdr.headers()?.clone();
34    
35    // 返回 headers
36    Ok(headers)
37}
38
39// 假设每个 Excel 文件的最大行数是一个常量
40const ROWS_PER_FILE: usize = 1_000_000;
41
42// 转化函数,增加 cancel_flag 参数,用于检测取消信号
43pub fn convert_from_path(file_path: &str,  output_dir: &str, cancel_flag: &Arc<AtomicBool>) -> Result<(), Box<dyn Error>> {
44    println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
45
46    // 确保输出目录存在
47    std::fs::create_dir_all(output_dir)?;
48    
49    // 从文件路径获取基本文件名
50    // let base_name = file_path.split('.').next().unwrap_or("not_found_file");
51    let path = Path::new(file_path);
52    let base_name = path
53        .file_stem()
54        .and_then(|os| os.to_str())
55        .unwrap_or("not_found_file");
56
57    // 在开始计数前检测取消标志
58    if cancel_flag.load(Ordering::SeqCst) {
59        println!("取消操作:在计数阶段");
60        return Err("Operation cancelled".into());
61    }
62    let total_rows = count_csv_rows(file_path)?;
63    println!("文件总行数(不含表头):{}", total_rows);
64    let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
65    println!("预计生成文件数:{}", total_files);
66
67    // 打开 CSV 文件,并使用 8MB 缓冲区读取
68    let file = File::open(file_path)?;
69    let buf_reader = BufReader::with_capacity(8 * 1024 * 1024, file);
70    let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
71
72    // 获取表头
73    let headers: Vec<String> = reader
74        .headers()?
75        .iter()
76        .map(|s| s.to_string())
77        .collect();
78
79    let mut file_index = 1;
80    let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(ROWS_PER_FILE);
81
82    // 逐条处理 CSV 记录
83    for result in reader.records() {
84        // 每次读取记录前检查取消信号
85        if cancel_flag.load(Ordering::SeqCst) {
86            println!("检测到取消信号,提前退出转换过程");
87            return Err("Operation cancelled".into());
88        }
89
90        let record: StringRecord = result?;
91        let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
92        current_chunk.push(row_data);
93
94        // 当累计达到设定行数时,写入一个 Excel 文件
95        if current_chunk.len() == ROWS_PER_FILE {
96            // 写入前再次检查取消信号
97            if cancel_flag.load(Ordering::SeqCst) {
98                println!("检测到取消信号,在写入文件前取消");
99                return Err("Operation cancelled".into());
100            }
101            // let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
102            // write_to_excel(&output_file_name, &headers, &current_chunk,  Arc::clone(&cancel_flag))?;
103            // 构建完整输出路径
104            let output_path = Path::new(output_dir)
105                .join(format!("{}-part{}.xlsx", base_name, file_index));
106            println!("写入文件:{}", output_path.to_string_lossy());
107            write_to_excel(&output_path.to_string_lossy(), &headers, &current_chunk,  Arc::clone(&cancel_flag))?;
108            current_chunk.clear();
109            file_index += 1;
110        }
111    }
112
113    // 如果剩余未满一批,则写入最后一个文件之前检查取消
114    if !current_chunk.is_empty() {
115        if cancel_flag.load(Ordering::SeqCst) {
116            println!("检测到取消信号,在写入剩余文件前取消");
117            return Err("Operation cancelled".into());
118        }
119        // let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
120        // write_to_excel(&output_file_name, &headers, &current_chunk,  Arc::clone(&cancel_flag))?;
121        let output_path = Path::new(output_dir)
122            .join(format!("{}-part{}.xlsx", base_name, file_index));
123        println!("写入文件:{}", output_path.to_string_lossy());
124        write_to_excel(&output_path.to_string_lossy(), &headers, &current_chunk,  Arc::clone(&cancel_flag))?;
125    }
126
127    println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
128    Ok(())
129}
130
131// 将指定数据写入 Excel 文件的函数(这里暂不增加内部取消检查,如有需要可在循环中添加)
132fn write_to_excel(
133    file_path: &str,
134    headers: &[String],
135    data: &[Vec<String>],
136    cancel_flag: Arc<AtomicBool>
137) -> Result<(), Box<dyn Error>> {
138    // 确保目标目录存在
139    if let Some(parent) = Path::new(file_path).parent() {
140        std::fs::create_dir_all(parent)?;
141    }
142
143    // 创建 Excel 工作簿对象(此处以 Workbook 为例,你可以使用相应的 Excel 写入库)
144    let mut workbook = Workbook::new();
145    let worksheet = workbook.add_worksheet_with_constant_memory();
146
147    // 写入表头前检查是否被取消
148    if cancel_flag.load(Ordering::SeqCst) {
149        println!("取消信号:在写入表头前取消");
150        // return Err(XlsxError("Operation cancelled".to_string()));
151        return Err("Operation cancelled".into());
152    }
153
154    // 写入表头
155    worksheet.write_row(0, 0, headers)?;
156
157    // 逐行写入数据
158    for (row_index, row) in data.iter().enumerate() {
159        if cancel_flag.load(Ordering::SeqCst) {
160            println!("取消信号:在写入第 {} 行时取消", row_index + 1);
161            return Err("Operation cancelled".into());
162        }
163        worksheet.write_row((row_index + 1) as u32, 0, row)?;
164    }
165    workbook.save(file_path)?;
166    Ok(())
167}
168
169// 计算 CSV 文件的行数(不包含表头)
170fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
171    let file = File::open(file_path)?;
172    let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
173    // 计算行数并减去表头
174    let total_rows = reader.lines().count().saturating_sub(1);
175    Ok(total_rows)
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use std::io::Write;
182    use tempfile::NamedTempFile;
183
184    // #[test]
185    // fn test_read_columns_headers() {
186    //     // 创建一个临时 CSV 文件,写入测试数据
187    //     let mut temp_file = NamedTempFile::new().expect("创建临时文件失败");
188    //     writeln!(temp_file, "name,age,city").expect("写入测试数据失败");
189    //     writeln!(temp_file, "Alice,30,New York").expect("写入测试数据失败");
190        
191    //     // 调用库函数
192    //     let file = "data.csv";
193    //     let headers = read_columns_headers(file);
194    //     // println!("headers, {:?}", headers);
195    //     println!("-------------------");
196    //     // let headers = read_columns_headers(temp_file.path().to_str().unwrap()).expect("读取 CSV 表头失败");
197        
198    //     // 检查表头内容
199    //     let expected = csv::StringRecord::from(vec!["name", "age", "city"]);
200    //     // assert_eq!(headers, expected);
201    // }
202
203    #[test]
204    fn test_convert_from_path() {
205        // 调用库函数
206        // let file = "data.csv";
207        let file_path = "D:\\oracle-export-test\\csv\\aaa.csv";
208        let output_dir = "D:\\oracle-export-test\\xlsx";
209        let flag = Arc::new(AtomicBool::new(false));
210        convert_from_path(file_path, output_dir, &flag);
211    }
212}