cs_csv/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
use csv::StringRecord;
use std::error::Error;
use std::fs::File;
use std::env;

use rust_xlsxwriter::{Workbook, XlsxError};
use std::io::{BufReader, BufRead};
use csv::ReaderBuilder;
use chrono::Local;

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;



/// 读取 CSV 文件的表头(假设第一行为表头)
///
/// # 参数
///
/// * `path` - CSV 文件的路径
///
/// # 返回
///
/// 返回一个 `Result<StringRecord, Box<dyn Error>>`,其中 `StringRecord` 包含 CSV 表头。
pub fn read_columns_headers(path: &str) -> Result<StringRecord, Box<dyn Error>> {
    // 打开 CSV 文件
    let file = File::open(path)?;
    
    // 创建 CSV 读取器
    let mut rdr = csv::Reader::from_reader(file);
    
    // 读取 CSV 的表头(假定第一行是表头)
    let headers = rdr.headers()?.clone();
    
    // 返回 headers
    Ok(headers)
}


pub fn add(left: u64, right: u64) -> u64 {
    left + right
}


// const ROWS_PER_FILE: usize = 1_000_000;


// pub fn convert_from_path(file_path: &str) -> Result<(), Box<dyn Error>> {
//     println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
//     let file_name = file_path.split('.').next().unwrap_or("not_found_file");

//     let total_rows = count_csv_rows(file_path)?;
//     println!("文件总行数(不含表头):{}", total_rows);
//     let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
//     println!("预计生成文件数:{}", total_files);

//     let file = File::open(file_path)?;
//     // 8MB的缓冲区
//     let buf_reader = BufReader::with_capacity(8*1024*1024, file);

//     let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);
//     // let mut reader = ReaderBuilder::new().has_headers(true).from_reader(BufReader::new(file));

//     // let total_rows = reader.lines().count().saturating_sub(1); 
//     // println!("Total rows: {}", total_rows);
//     let headers: Vec<String> = reader.headers()?.iter().map(|s| s.to_string()).collect();

//     let mut file_index = 1;
//     let mut current_chunk = Vec::with_capacity(ROWS_PER_FILE);

//     // let mut cnt = 0;

//     for result in reader.records() {
//         let record = result?;
//         let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
//         current_chunk.push(row_data);

//         if current_chunk.len() == ROWS_PER_FILE {
//             // cnt += 1;
//             // println!("生成任务批次: {}", cnt);
//             let file_name = format!("{}-part{}.xlsx", file_name, file_index);
//             write_to_excel(&file_name, &headers, &current_chunk)?;
//             current_chunk.clear();
//             file_index += 1;

//         }
//     }

//     if !current_chunk.is_empty() {
//         // cnt += 1;
//         // println!("生成任务批次: {}", cnt);
//         let file_name = format!("{}-part{}.xlsx", file_name, file_index);
//         write_to_excel(&file_name, &headers, &current_chunk)?;

//     }

//     println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
//     // println!("所有文件生成完成,总用时:{}秒", start_time.elapsed()?.as_secs());
//     Ok(())
// }


// fn write_to_excel(
//     file_name: &str,
//     headers: &[String],
//     data: &[Vec<String>],
// ) -> Result<(), XlsxError> {
//     // println!("Time: {}, Saving file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
//     let mut workbook = Workbook::new();

//     // let worksheet = workbook.add_worksheet();

//     let worksheet = workbook.add_worksheet_with_constant_memory();

//     worksheet.write_row(0, 0, headers)?;

//     for (row_index, row) in data.iter().enumerate() {
//         worksheet.write_row((row_index + 1) as u32, 0, row)?;
//         // worksheet.write_multi_row(row, col, data)?;
//     }
//     workbook.save(file_name)?;
//     // println!("Time: {}, Saved file: {}", Local::now().format("%Y-%m-%d %H:%M:%S"), file_name);
//     Ok(())
// }

// fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
//     let file = File::open(file_path)?;
//     // let reader = BufReader::new(file);
//     let reader = BufReader::with_capacity(8*1024*1024, file);
//     let total_rows = reader.lines().count().saturating_sub(1);  // 使用 saturating_sub 以避免下溢
//     Ok(total_rows)
// }


// 假设这是你已有的 XlsxError 类型
// #[derive(Debug)]
// pub struct XlsxError(String);
// impl std::fmt::Display for XlsxError {
//     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
//         write!(f, "XlsxError: {}", self.0)
//     }
// }
// impl Error for XlsxError {}
// impl From<String> for XlsxError {
//     fn from(err: String) -> XlsxError {
//         XlsxError(err)
//     }
// }



// 假设每个 Excel 文件的最大行数是一个常量
const ROWS_PER_FILE: usize = 100_000;

// 转化函数,增加 cancel_flag 参数,用于检测取消信号
pub fn convert_from_path(file_path: &str, cancel_flag: Arc<AtomicBool>) -> Result<(), Box<dyn Error>> {
    println!("开始时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
    
    // 从文件路径获取基本文件名
    let base_name = file_path.split('.').next().unwrap_or("not_found_file");

    // 在开始计数前检测取消标志
    if cancel_flag.load(Ordering::SeqCst) {
        println!("取消操作:在计数阶段");
        return Err("Operation cancelled".into());
    }
    let total_rows = count_csv_rows(file_path)?;
    println!("文件总行数(不含表头):{}", total_rows);
    let total_files = (total_rows + ROWS_PER_FILE - 1) / ROWS_PER_FILE;
    println!("预计生成文件数:{}", total_files);

    // 打开 CSV 文件,并使用 8MB 缓冲区读取
    let file = File::open(file_path)?;
    let buf_reader = BufReader::with_capacity(8 * 1024 * 1024, file);
    let mut reader = ReaderBuilder::new().has_headers(true).from_reader(buf_reader);

    // 获取表头
    let headers: Vec<String> = reader
        .headers()?
        .iter()
        .map(|s| s.to_string())
        .collect();

    let mut file_index = 1;
    let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(ROWS_PER_FILE);

    // 逐条处理 CSV 记录
    for result in reader.records() {
        // 每次读取记录前检查取消信号
        if cancel_flag.load(Ordering::SeqCst) {
            println!("检测到取消信号,提前退出转换过程");
            return Err("Operation cancelled".into());
        }

        let record: StringRecord = result?;
        let row_data: Vec<String> = record.iter().map(|s| s.to_string()).collect();
        current_chunk.push(row_data);

        // 当累计达到设定行数时,写入一个 Excel 文件
        if current_chunk.len() == ROWS_PER_FILE {
            // 写入前再次检查取消信号
            if cancel_flag.load(Ordering::SeqCst) {
                println!("检测到取消信号,在写入文件前取消");
                return Err("Operation cancelled".into());
            }
            let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
            write_to_excel(&output_file_name, &headers, &current_chunk,  Arc::clone(&cancel_flag))?;
            current_chunk.clear();
            file_index += 1;
        }
    }

    // 如果剩余未满一批,则写入最后一个文件之前检查取消
    if !current_chunk.is_empty() {
        if cancel_flag.load(Ordering::SeqCst) {
            println!("检测到取消信号,在写入剩余文件前取消");
            return Err("Operation cancelled".into());
        }
        let output_file_name = format!("{}-part{}.xlsx", base_name, file_index);
        write_to_excel(&output_file_name, &headers, &current_chunk,  Arc::clone(&cancel_flag))?;
    }

    println!("结束时间:{}", Local::now().format("%Y-%m-%d %H:%M:%S"));
    Ok(())
}

// 将指定数据写入 Excel 文件的函数(这里暂不增加内部取消检查,如有需要可在循环中添加)
fn write_to_excel(
    file_name: &str,
    headers: &[String],
    data: &[Vec<String>],
    cancel_flag: Arc<AtomicBool>
) -> Result<(), Box<dyn Error>> {
    // 创建 Excel 工作簿对象(此处以 Workbook 为例,你可以使用相应的 Excel 写入库)
    let mut workbook = Workbook::new();
    let worksheet = workbook.add_worksheet_with_constant_memory();

    // 写入表头前检查是否被取消
    if cancel_flag.load(Ordering::SeqCst) {
        println!("取消信号:在写入表头前取消");
        // return Err(XlsxError("Operation cancelled".to_string()));
        return Err("Operation cancelled".into());
    }

    // 写入表头
    worksheet.write_row(0, 0, headers)?;

    // 逐行写入数据
    for (row_index, row) in data.iter().enumerate() {
        if cancel_flag.load(Ordering::SeqCst) {
            println!("取消信号:在写入第 {} 行时取消", row_index + 1);
            return Err("Operation cancelled".into());
        }
        worksheet.write_row((row_index + 1) as u32, 0, row)?;
    }
    workbook.save(file_name)?;
    Ok(())
}

// 计算 CSV 文件的行数(不包含表头)
fn count_csv_rows(file_path: &str) -> Result<usize, Box<dyn Error>> {
    let file = File::open(file_path)?;
    let reader = BufReader::with_capacity(8 * 1024 * 1024, file);
    // 计算行数并减去表头
    let total_rows = reader.lines().count().saturating_sub(1);
    Ok(total_rows)
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Write;
    use tempfile::NamedTempFile;

    #[test]
    fn it_works() {
        let result = add(2, 2);
        assert_eq!(result, 4);
    }


    #[test]
    fn test_read_columns_headers() {
        // 创建一个临时 CSV 文件,写入测试数据
        let mut temp_file = NamedTempFile::new().expect("创建临时文件失败");
        writeln!(temp_file, "name,age,city").expect("写入测试数据失败");
        writeln!(temp_file, "Alice,30,New York").expect("写入测试数据失败");
        
        // 调用库函数
        let file = "data.csv";
        let headers = read_columns_headers(file);
        // println!("headers, {:?}", headers);
        println!("-------------------");
        // let headers = read_columns_headers(temp_file.path().to_str().unwrap()).expect("读取 CSV 表头失败");
        
        // 检查表头内容
        let expected = csv::StringRecord::from(vec!["name", "age", "city"]);
        // assert_eq!(headers, expected);
    }

    #[test]
    fn test_convert_from_path() {
        // 调用库函数
        let file = "data.csv";
        let flag = Arc::new(AtomicBool::new(false));
        convert_from_path(file, flag);
    }
}