cs_csv/lib.rs

use csv::StringRecord;
use std::error::Error;
use std::fs::File;
use std::env;
use rust_xlsxwriter::{Workbook, XlsxError};
use std::io::{BufReader, BufRead};
use csv::ReaderBuilder;
use chrono::Local;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
/// 读取 CSV 文件的表头(假设第一行为表头)
///
/// # 参数
///
/// * `path` - CSV 文件的路径
///
/// # 返回
///
/// 返回一个 `Result<StringRecord, Box<dyn Error>>`,其中 `StringRecord` 包含 CSV 表头。
pub fn read_columns_headers(path: &str) -> Result<StringRecord, Box<dyn Error>> {
// 打开 CSV 文件
let file = File::open(path)?;
// 创建 CSV 读取器
let mut rdr = csv::Reader::from_reader(file);
// 读取 CSV 的表头(假定第一行是表头)
let headers = rdr.headers()?.clone();
// 返回 headers
Ok(headers)
}
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
// const ROWS_PER_FILE: usize = 1_000_000;
// pub fn convert_from_path(file_path: &str) -> Result<(), Box<dyn Error>> {
// println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
// let file_name = file_path.split('.').next().unwrap_or("not_found_file");
// let total_rows = count_csv_rows(file_path)?;
// println!("文件总行数(不含表头):{}", total_rows);
// let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
// println!("预计生成文件数:{}", total_files);
// let file = File::open(file_path)?;
// // 8MB的缓冲区
// let buf_reader = BufReader::with_capacity(8*1024*1024, file);
// let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
// // let mut reader = ReaderBuilder::new().has_headers(true).from_reader(BufReader::new(file));
// // let total_rows = reader.lines().count().saturating_sub(1);
// // println!("Total rows: {}", total_rows);
// let headers: Vec<String> = reader.headers()?.iter().map(|s| s.to_string()).collect();
// let mut file_index = 1;
// let mut current_chunk = Vec::with_capacity(ROWS_PER_FILE);
// // let mut cnt = 0;
// for result in reader.records() {
// let record = result?;
// let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
// current_chunk.push(row_data);
// if current_chunk.len() == ROWS_PER_FILE {
// // cnt += 1;
// // println!("生成任务批次: {}", cnt);
// let file_name = format!("{}-part{}.xlsx", file_name, file_index);
// write_to_excel(&file_name, &headers, ¤t_chunk)?;
// current_chunk.clear();
// file_index += 1;
// }
// }
// if !current_chunk.is_empty() {
// // cnt += 1;
// // println!("生成任务批次: {}", cnt);
// let file_name = format!("{}-part{}.xlsx", file_name, file_index);
// write_to_excel(&file_name, &headers, ¤t_chunk)?;
// }
// println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
// // println!("所有文件生成完成,总用时:{}秒", start_time.elapsed()?.as_secs());
// Ok(())
// }
// fn write_to_excel(
// file_name: &str,
// headers: &[String],
// data: &[Vec<String>],
// ) -> Result<(), XlsxError> {
// // println!("Time: {}, Saving file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
// let mut workbook = Workbook::new();
// // let worksheet = workbook.add_worksheet();
// let worksheet = workbook.add_worksheet_with_constant_memory();
// worksheet.write_row(0, 0, headers)?;
// for (row_index, row) in data.iter().enumerate() {
// worksheet.write_row((row_index + 1) as u32, 0, row)?;
// // worksheet.write_multi_row(row, col, data)?;
// }
// workbook.save(file_name)?;
// // println!("Time: {}, Saved file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
// Ok(())
// }
// fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
// let file = File::open(file_path)?;
// // let reader = BufReader::new(file);
// let reader = BufReader::with_capacity(8*1024*1024, file);
// let total_rows = reader.lines().count().saturating_sub(1); // 使用 saturating_sub 以避免下溢
// Ok(total_rows)
// }
// 假设这是你已有的 XlsxError 类型
// #[derive(Debug)]
// pub struct XlsxError(String);
// impl std::fmt::Display for XlsxError {
// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// write!(f, "XlsxError: {}", self.0)
// }
// }
// impl Error for XlsxError {}
// impl From<String> for XlsxError {
// fn from(err: String) -> XlsxError {
// XlsxError(err)
// }
// }
// 假设每个 Excel 文件的最大行数是一个常量
const ROWS_PER_FILE: usize = 100_000;
// 转化函数,增加 cancel_flag 参数,用于检测取消信号
pub fn convert_from_path(file_path: &str, cancel_flag: Arc<AtomicBool>) -> Result<(), Box<dyn Error>> {
println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
// 从文件路径获取基本文件名
let base_name = file_path.split('.').next().unwrap_or("not_found_file");
// 在开始计数前检测取消标志
if cancel_flag.load(Ordering::SeqCst) {
println!("取消操作:在计数阶段");
return Err("Operation cancelled".into());
}
let total_rows = count_csv_rows(file_path)?;
println!("文件总行数(不含表头):{}", total_rows);
let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
println!("预计生成文件数:{}", total_files);
// 打开 CSV 文件,并使用 8MB 缓冲区读取
let file = File::open(file_path)?;
let buf_reader = BufReader::with_capacity(8 * 1024 * 1024, file);
let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
// 获取表头
let headers: Vec<String> = reader
.headers()?
.iter()
.map(|s| s.to_string())
.collect();
let mut file_index = 1;
let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(ROWS_PER_FILE);
// 逐条处理 CSV 记录
for result in reader.records() {
// 每次读取记录前检查取消信号
if cancel_flag.load(Ordering::SeqCst) {
println!("检测到取消信号,提前退出转换过程");
return Err("Operation cancelled".into());
}
let record: StringRecord = result?;
let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
current_chunk.push(row_data);
// 当累计达到设定行数时,写入一个 Excel 文件
if current_chunk.len() == ROWS_PER_FILE {
// 写入前再次检查取消信号
if cancel_flag.load(Ordering::SeqCst) {
println!("检测到取消信号,在写入文件前取消");
return Err("Operation cancelled".into());
}
let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
write_to_excel(&output_file_name, &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
current_chunk.clear();
file_index += 1;
}
}
// 如果剩余未满一批,则写入最后一个文件之前检查取消
if !current_chunk.is_empty() {
if cancel_flag.load(Ordering::SeqCst) {
println!("检测到取消信号,在写入剩余文件前取消");
return Err("Operation cancelled".into());
}
let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
write_to_excel(&output_file_name, &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
}
println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
Ok(())
}
// 将指定数据写入 Excel 文件的函数(这里暂不增加内部取消检查,如有需要可在循环中添加)
fn write_to_excel(
file_name: &str,
headers: &[String],
data: &[Vec<String>],
cancel_flag: Arc<AtomicBool>
) -> Result<(), Box<dyn Error>> {
// 创建 Excel 工作簿对象(此处以 Workbook 为例,你可以使用相应的 Excel 写入库)
let mut workbook = Workbook::new();
let worksheet = workbook.add_worksheet_with_constant_memory();
// 写入表头前检查是否被取消
if cancel_flag.load(Ordering::SeqCst) {
println!("取消信号:在写入表头前取消");
// return Err(XlsxError("Operation cancelled".to_string()));
return Err("Operation cancelled".into());
}
// 写入表头
worksheet.write_row(0, 0, headers)?;
// 逐行写入数据
for (row_index, row) in data.iter().enumerate() {
if cancel_flag.load(Ordering::SeqCst) {
println!("取消信号:在写入第 {} 行时取消", row_index + 1);
return Err("Operation cancelled".into());
}
worksheet.write_row((row_index + 1) as u32, 0, row)?;
}
workbook.save(file_name)?;
Ok(())
}
// 计算 CSV 文件的行数(不包含表头)
fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
let file = File::open(file_path)?;
let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
// 计算行数并减去表头
let total_rows = reader.lines().count().saturating_sub(1);
Ok(total_rows)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
#[test]
fn test_read_columns_headers() {
// 创建一个临时 CSV 文件,写入测试数据
let mut temp_file = NamedTempFile::new().expect("创建临时文件失败");
writeln!(temp_file, "name,age,city").expect("写入测试数据失败");
writeln!(temp_file, "Alice,30,New York").expect("写入测试数据失败");
// 调用库函数
let file = "data.csv";
let headers = read_columns_headers(file);
// println!("headers, {:?}", headers);
println!("-------------------");
// let headers = read_columns_headers(temp_file.path().to_str().unwrap()).expect("读取 CSV 表头失败");
// 检查表头内容
let expected = csv::StringRecord::from(vec!["name", "age", "city"]);
// assert_eq!(headers, expected);
}
#[test]
fn test_convert_from_path() {
// 调用库函数
let file = "data.csv";
let flag = Arc::new(AtomicBool::new(false));
convert_from_path(file, flag);
}
}