cs_csv/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
use csv::StringRecord;
use std::error::Error;
use std::fs::File;
use std::env;
use rust_xlsxwriter::{Workbook, XlsxError};
use std::io::{BufReader, BufRead};
use csv::ReaderBuilder;
use chrono::Local;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
/// 读取 CSV 文件的表头(假设第一行为表头)
///
/// # 参数
///
/// * `path` - CSV 文件的路径
///
/// # 返回
///
/// 返回一个 `Result<StringRecord, Box<dyn Error>>`,其中 `StringRecord` 包含 CSV 表头。
pub fn read_columns_headers(path: &str) -> Result<StringRecord, Box<dyn Error>> {
// 打开 CSV 文件
let file = File::open(path)?;
// 创建 CSV 读取器
let mut rdr = csv::Reader::from_reader(file);
// 读取 CSV 的表头(假定第一行是表头)
let headers = rdr.headers()?.clone();
// 返回 headers
Ok(headers)
}
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
// const ROWS_PER_FILE: usize = 1_000_000;
// pub fn convert_from_path(file_path: &str) -> Result<(), Box<dyn Error>> {
// println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
// let file_name = file_path.split('.').next().unwrap_or("not_found_file");
// let total_rows = count_csv_rows(file_path)?;
// println!("文件总行数(不含表头):{}", total_rows);
// let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
// println!("预计生成文件数:{}", total_files);
// let file = File::open(file_path)?;
// // 8MB的缓冲区
// let buf_reader = BufReader::with_capacity(8*1024*1024, file);
// let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
// // let mut reader = ReaderBuilder::new().has_headers(true).from_reader(BufReader::new(file));
// // let total_rows = reader.lines().count().saturating_sub(1);
// // println!("Total rows: {}", total_rows);
// let headers: Vec<String> = reader.headers()?.iter().map(|s| s.to_string()).collect();
// let mut file_index = 1;
// let mut current_chunk = Vec::with_capacity(ROWS_PER_FILE);
// // let mut cnt = 0;
// for result in reader.records() {
// let record = result?;
// let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
// current_chunk.push(row_data);
// if current_chunk.len() == ROWS_PER_FILE {
// // cnt += 1;
// // println!("生成任务批次: {}", cnt);
// let file_name = format!("{}-part{}.xlsx", file_name, file_index);
// write_to_excel(&file_name, &headers, ¤t_chunk)?;
// current_chunk.clear();
// file_index += 1;
// }
// }
// if !current_chunk.is_empty() {
// // cnt += 1;
// // println!("生成任务批次: {}", cnt);
// let file_name = format!("{}-part{}.xlsx", file_name, file_index);
// write_to_excel(&file_name, &headers, ¤t_chunk)?;
// }
// println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
// // println!("所有文件生成完成,总用时:{}秒", start_time.elapsed()?.as_secs());
// Ok(())
// }
// fn write_to_excel(
// file_name: &str,
// headers: &[String],
// data: &[Vec<String>],
// ) -> Result<(), XlsxError> {
// // println!("Time: {}, Saving file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
// let mut workbook = Workbook::new();
// // let worksheet = workbook.add_worksheet();
// let worksheet = workbook.add_worksheet_with_constant_memory();
// worksheet.write_row(0, 0, headers)?;
// for (row_index, row) in data.iter().enumerate() {
// worksheet.write_row((row_index + 1) as u32, 0, row)?;
// // worksheet.write_multi_row(row, col, data)?;
// }
// workbook.save(file_name)?;
// // println!("Time: {}, Saved file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
// Ok(())
// }
// fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
// let file = File::open(file_path)?;
// // let reader = BufReader::new(file);
// let reader = BufReader::with_capacity(8*1024*1024, file);
// let total_rows = reader.lines().count().saturating_sub(1); // 使用 saturating_sub 以避免下溢
// Ok(total_rows)
// }
// 假设这是你已有的 XlsxError 类型
// #[derive(Debug)]
// pub struct XlsxError(String);
// impl std::fmt::Display for XlsxError {
// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// write!(f, "XlsxError: {}", self.0)
// }
// }
// impl Error for XlsxError {}
// impl From<String> for XlsxError {
// fn from(err: String) -> XlsxError {
// XlsxError(err)
// }
// }
// 假设每个 Excel 文件的最大行数是一个常量
const ROWS_PER_FILE: usize = 100_000;
// 转化函数,增加 cancel_flag 参数,用于检测取消信号
pub fn convert_from_path(file_path: &str, cancel_flag: Arc<AtomicBool>) -> Result<(), Box<dyn Error>> {
println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
// 从文件路径获取基本文件名
let base_name = file_path.split('.').next().unwrap_or("not_found_file");
// 在开始计数前检测取消标志
if cancel_flag.load(Ordering::SeqCst) {
println!("取消操作:在计数阶段");
return Err("Operation cancelled".into());
}
let total_rows = count_csv_rows(file_path)?;
println!("文件总行数(不含表头):{}", total_rows);
let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
println!("预计生成文件数:{}", total_files);
// 打开 CSV 文件,并使用 8MB 缓冲区读取
let file = File::open(file_path)?;
let buf_reader = BufReader::with_capacity(8 * 1024 * 1024, file);
let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
// 获取表头
let headers: Vec<String> = reader
.headers()?
.iter()
.map(|s| s.to_string())
.collect();
let mut file_index = 1;
let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(ROWS_PER_FILE);
// 逐条处理 CSV 记录
for result in reader.records() {
// 每次读取记录前检查取消信号
if cancel_flag.load(Ordering::SeqCst) {
println!("检测到取消信号,提前退出转换过程");
return Err("Operation cancelled".into());
}
let record: StringRecord = result?;
let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
current_chunk.push(row_data);
// 当累计达到设定行数时,写入一个 Excel 文件
if current_chunk.len() == ROWS_PER_FILE {
// 写入前再次检查取消信号
if cancel_flag.load(Ordering::SeqCst) {
println!("检测到取消信号,在写入文件前取消");
return Err("Operation cancelled".into());
}
let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
write_to_excel(&output_file_name, &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
current_chunk.clear();
file_index += 1;
}
}
// 如果剩余未满一批,则写入最后一个文件之前检查取消
if !current_chunk.is_empty() {
if cancel_flag.load(Ordering::SeqCst) {
println!("检测到取消信号,在写入剩余文件前取消");
return Err("Operation cancelled".into());
}
let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
write_to_excel(&output_file_name, &headers, ¤t_chunk, Arc::clone(&cancel_flag))?;
}
println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
Ok(())
}
// 将指定数据写入 Excel 文件的函数(这里暂不增加内部取消检查,如有需要可在循环中添加)
fn write_to_excel(
file_name: &str,
headers: &[String],
data: &[Vec<String>],
cancel_flag: Arc<AtomicBool>
) -> Result<(), Box<dyn Error>> {
// 创建 Excel 工作簿对象(此处以 Workbook 为例,你可以使用相应的 Excel 写入库)
let mut workbook = Workbook::new();
let worksheet = workbook.add_worksheet_with_constant_memory();
// 写入表头前检查是否被取消
if cancel_flag.load(Ordering::SeqCst) {
println!("取消信号:在写入表头前取消");
// return Err(XlsxError("Operation cancelled".to_string()));
return Err("Operation cancelled".into());
}
// 写入表头
worksheet.write_row(0, 0, headers)?;
// 逐行写入数据
for (row_index, row) in data.iter().enumerate() {
if cancel_flag.load(Ordering::SeqCst) {
println!("取消信号:在写入第 {} 行时取消", row_index + 1);
return Err("Operation cancelled".into());
}
worksheet.write_row((row_index + 1) as u32, 0, row)?;
}
workbook.save(file_name)?;
Ok(())
}
// 计算 CSV 文件的行数(不包含表头)
fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
let file = File::open(file_path)?;
let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
// 计算行数并减去表头
let total_rows = reader.lines().count().saturating_sub(1);
Ok(total_rows)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
#[test]
fn test_read_columns_headers() {
// 创建一个临时 CSV 文件,写入测试数据
let mut temp_file = NamedTempFile::new().expect("创建临时文件失败");
writeln!(temp_file, "name,age,city").expect("写入测试数据失败");
writeln!(temp_file, "Alice,30,New York").expect("写入测试数据失败");
// 调用库函数
let file = "data.csv";
let headers = read_columns_headers(file);
// println!("headers, {:?}", headers);
println!("-------------------");
// let headers = read_columns_headers(temp_file.path().to_str().unwrap()).expect("读取 CSV 表头失败");
// 检查表头内容
let expected = csv::StringRecord::from(vec!["name", "age", "city"]);
// assert_eq!(headers, expected);
}
#[test]
fn test_convert_from_path() {
// 调用库函数
let file = "data.csv";
let flag = Arc::new(AtomicBool::new(false));
convert_from_path(file, flag);
}
}