1use std::fs::File;
2use std::io::BufReader;
3use std::str::FromStr;
4use std::sync::Arc;
5use calamine::Sheets;
6use csv::{ReaderBuilder, StringRecord};
7use heck::ToSnakeCase;
8use tokio::sync::mpsc;
9use serde_json::{Number, Value};
10use simple_string_patterns::*;
11use indexmap::IndexMap;
12use std::path::Path;
13
14use calamine::{open_workbook_auto, Data, Reader};
15use fuzzy_datetime::{iso_fuzzy_to_date_string, iso_fuzzy_to_datetime_string};
16use crate::headers::*;
17use crate::data_set::*;
18use crate::helpers::float_value;
19use crate::helpers::string_value;
20use crate::is_truthy::*;
21use crate::round_decimal::RoundDecimal;
22use crate::Extension;
23use crate::Format;
24use crate::OptionSet;
25use crate::euro_number_format::is_euro_number_format;
26use crate::PathData;
27use crate::RowOptionSet;
28use crate::error::GenericError;
29
30pub fn process_spreadsheet_direct(opts: &OptionSet) -> Result<ResultSet, GenericError> {
33 let rt = tokio::runtime::Runtime::new().unwrap();
34 rt.block_on(process_spreadsheet_core(opts, None, None))
35}
36
37pub async fn process_spreadsheet_immediate(opts: &OptionSet) -> Result<ResultSet, GenericError> {
41 process_spreadsheet_core(opts, None, None).await
42}
43
44#[deprecated(
45 since = "1.0.6",
46 note = "This function is a wrapper for the renamed function `process_spreadsheet_inline`"
47)]
48pub async fn render_spreadsheet_direct(opts: &OptionSet) -> Result<ResultSet, GenericError> {
49 process_spreadsheet_core(opts, None, None).await
50}
51
52pub async fn process_spreadsheet_async(
54 opts: &OptionSet,
55 save_func: Box<dyn Fn(IndexMap<String, Value>) -> Result<(), GenericError> + Send + Sync>,
56 out_ref: Option<&str>
57 ) -> Result<ResultSet, GenericError> {
58 process_spreadsheet_core(opts, Some(save_func), out_ref).await
59}
60
61pub async fn process_spreadsheet_core(
64 opts: &OptionSet,
65 save_opt: Option<Box<dyn Fn(IndexMap<String, Value>) -> Result<(), GenericError> + Send + Sync>>,
66 out_ref: Option<&str>
67) -> Result<ResultSet, GenericError> {
68 if let Some(filepath) = opts.path.clone() {
69 let path = Path::new(&filepath);
70 if !path.exists() {
71 #[allow(dead_code)]
72 return Err(GenericError("file_unavailable"));
73 }
74 let path_data = PathData::new(path);
75 if path_data.is_valid() {
76 if path_data.use_calamine() {
77 read_workbook_core(&path_data, opts, save_opt, out_ref).await
78 } else {
79 read_csv_core(&path_data, opts, save_opt, out_ref).await
80 }
81 } else {
82 Err(GenericError("unsupported_format"))
83 }
84 } else {
85 Err(GenericError("no_filepath_specified"))
86 }
87}
88
89
90#[deprecated(
91 since = "1.0.6",
92 note = "This function is a wrapper for the renamed function `process_spreadsheet_core`"
93)]
94pub async fn render_spreadsheet_core(
95 opts: &OptionSet,
96 save_opt: Option<Box<dyn Fn(IndexMap<String, Value>) -> Result<(), GenericError> + Send + Sync>>,
97 out_ref: Option<&str>
98) -> Result<ResultSet, GenericError> {
99 process_spreadsheet_core(opts, save_opt, out_ref).await
100}
101
102pub async fn read_workbook_core<'a>(
105 path_data: &PathData<'a>,
106 opts: &OptionSet,
107 save_opt: Option<Box<dyn Fn(IndexMap<String, Value>) -> Result<(), GenericError> + Send + Sync>>,
108 out_ref: Option<&str>
109) -> Result<ResultSet, GenericError> {
110 if let Ok(mut workbook) = open_workbook_auto(path_data.path()) {
111 let max_rows = opts.max_rows();
112 let (selected_names, sheet_names, _sheet_indices) = match_sheet_name_and_index(&mut workbook, opts);
113
114
115 if selected_names.len() > 0 {
116 let info = WorkbookInfo::new(path_data, &selected_names, &sheet_names);
117
118 if opts.multimode() {
119 read_multiple_worksheets(&mut workbook, &sheet_names, opts, &info, max_rows).await
120 } else {
121 let sheet_ref = &selected_names[0];
122 read_single_worksheet(workbook, sheet_ref, opts, &info, save_opt, out_ref).await
123 }
124 } else {
125 Err(GenericError("workbook_with_no_sheets"))
126 }
127 } else {
128 Err(GenericError("cannot_open_workbook"))
129 }
130}
131
132async fn read_multiple_worksheets(
134 workbook: &mut Sheets<BufReader<File>>,
135 sheet_names: &[String],
136 opts: &OptionSet,
137 info: &WorkbookInfo,
138 max_rows: usize
139) -> Result<ResultSet, GenericError> {
140 let mut sheets: Vec<SheetDataSet> = vec![];
141 let mut sheet_index: usize = 0;
142 let capture_rows = opts.capture_rows();
143 for sheet_ref in sheet_names {
144 let range = workbook.worksheet_range(&sheet_ref.clone())?;
145 let mut headers: Vec<String> = vec![];
146 let mut has_headers = false;
147 let capture_headers = !opts.omit_header;
148 let source_rows = range.rows();
149 let mut rows: Vec<IndexMap<String, Value>> = vec![];
150 let mut row_index = 0;
151 let header_row_index = opts.header_row_index();
152 let mut col_keys: Vec<String> = vec![];
153 let columns = if sheet_index == 0 {
154 opts.rows.columns.clone()
155 } else {
156 vec![]
157 };
158 let match_header_row_below = capture_headers && header_row_index > 0;
159 if let Some(first_row) = range.headers() {
160
161 headers = build_header_keys(&first_row, &columns, &opts.field_mode);
162 has_headers = !match_header_row_below;
163 col_keys = first_row;
164 }
165 let total = source_rows.clone().count();
166 if capture_rows || match_header_row_below {
167 let max_row_count = if capture_rows {
168 max_rows
169 } else {
170 header_row_index + 2
171 };
172 let max_take = if total < max_row_count {
173 total
174 } else {
175 max_row_count + 1
176 };
177 for row in source_rows.clone().take(max_take) {
178 if row_index > max_row_count {
179 break;
180 }
181 if match_header_row_below && (row_index + 1) == header_row_index {
182 let h_row = row.into_iter().map(|c| c.to_string().to_snake_case()).collect::<Vec<String>>();
183 headers = build_header_keys(&h_row, &columns, &opts.field_mode);
184 has_headers = true;
185 } else if (has_headers || !capture_headers) && capture_rows {
186 let row_map = workbook_row_to_map(row, &opts.rows, &headers);
187 if is_not_header_row(&row_map, row_index, &col_keys) {
188 rows.push(row_map);
189 }
190 }
191 row_index += 1;
192 }
193 }
194 sheets.push(SheetDataSet::new(&sheet_ref, &headers, &rows, total));
195 sheet_index += 1;
196 }
197 Ok(ResultSet::from_multiple(&sheets, &info, opts))
198}
199
200pub async fn read_single_worksheet(
202 mut workbook: Sheets<BufReader<File>>,
203 sheet_ref: &str,
204 opts: &OptionSet,
205 info: &WorkbookInfo,
206 save_opt: Option<Box<dyn Fn(IndexMap<String, Value>) -> Result<(), GenericError> + Send + Sync>>,
207 out_ref: Option<&str>,
208) -> Result<ResultSet, GenericError> {
209 let range = workbook.worksheet_range(sheet_ref)?;
210 let capture_rows = opts.capture_rows();
211 let columns = opts.rows.columns.clone();
212 let max_rows = opts.max_rows();
213 let mut headers: Vec<String> = vec![];
214 let mut col_keys: Vec<String> = vec![];
215 let mut has_headers = false;
216 let capture_headers = !opts.omit_header;
217 let source_rows = range.rows();
218 let mut rows: Vec<IndexMap<String, Value>> = vec![];
219 let mut row_index = 0;
220 let header_row_index = opts.header_row_index();
221 let match_header_row_below = capture_headers && header_row_index > 0;
222
223 if let Some(first_row) = range.headers() {
224 headers = build_header_keys(&first_row, &columns, &opts.field_mode);
225 has_headers = !match_header_row_below;
226 col_keys = first_row;
227 }
228 let total = source_rows.clone().count();
229 if capture_rows || match_header_row_below {
230 let max_row_count = if capture_rows {
231 max_rows
232 } else {
233 header_row_index + 2
234 };
235 let max_take = if total < max_row_count {
236 total
237 } else {
238 max_row_count + 1
239 };
240 for row in source_rows.clone().take(max_take) {
241 if row_index > max_row_count {
242 break;
243 }
244 if match_header_row_below && (row_index + 1) == header_row_index {
245 let h_row = row.into_iter().map(|c| c.to_string().to_snake_case()).collect::<Vec<String>>();
246 headers = build_header_keys(&h_row, &columns, &opts.field_mode);
247 has_headers = true;
248 } else if (has_headers || !capture_headers) && capture_rows {
249 let row_map = workbook_row_to_map(row, &opts.rows, &headers);
251 if is_not_header_row(&row_map, row_index,&col_keys) {
252 rows.push(row_map);
253 }
254 }
255 row_index += 1;
256 }
257 }
258 if let Some(save_method) = save_opt {
259 let (tx, mut rx) = mpsc::channel(32);
260 let opts = Arc::new(opts.clone()); let headers = headers.clone();
262 let col_keys = col_keys.clone(); let sheet_name = sheet_ref.to_string().clone();
264 tokio::spawn(async move {
265 if let Ok(range) = workbook.worksheet_range(&sheet_name) {
266 let mut source_rows = range.rows();
267 if let Some(first_row) = source_rows.next() {
268 let first_row_map = workbook_row_to_map(&first_row, &opts.rows, &headers);
269 if is_not_header_row(&first_row_map, 0, &col_keys) {
271 if tx.send(first_row_map).await.is_err() {
272 return; }
274 }
275 }
276
277 for row in source_rows {
279 let row_map = workbook_row_to_map(&row, &opts.rows, &headers);
280 if tx.send(row_map).await.is_err() {
281 break; }
283 }
284 }
285 });
286 while let Some(row) = rx.recv().await {
288 save_method(row)?;
289 }
290 }
291
292 let ds = DataSet::from_count_and_rows(total, rows, opts);
293 Ok(ResultSet::new(info, &headers, ds, opts, out_ref))
294}
295
296pub async fn read_csv_core<'a>(
299 path_data: &PathData<'a>,
300 opts: &OptionSet,
301 save_opt: Option<Box<dyn Fn(IndexMap<String, Value>) -> Result<(), GenericError> + Send + Sync>>,
302 out_ref: Option<&str>
303) -> Result<ResultSet, GenericError> {
304 let separator = match path_data.mode() {
305 Extension::Tsv => b't',
306 _ => b',',
307 };
308 if let Ok(mut rdr) = ReaderBuilder::new().delimiter(separator).from_path(path_data.path()) {
309 let capture_header = opts.omit_header == false;
310 let mut rows: Vec<IndexMap<String, Value>> = vec![];
311 let mut line_count = 0;
312 let has_max = opts.max.is_some();
313
314 let max_line_usize = opts.max_rows();
315 let mut headers: Vec<String> = vec![];
316 let capture_rows = opts.capture_rows();
317 if capture_header {
318 if let Ok(hdrs) = rdr.headers() {
319 headers = hdrs.into_iter().map(|s| s.to_owned()).collect();
320 }
321 let columns = opts.rows.columns.clone();
322 headers = build_header_keys(&headers, &columns, &opts.field_mode);
323 }
324
325 let mut total = 0;
326 if capture_rows {
327 for result in rdr.records() {
328 if has_max && line_count >= max_line_usize {
329 break;
330 }
331 if let Some(row) = csv_row_result_to_values(result, Arc::new(&opts.rows)) {
332 rows.push(to_index_map(&row, &headers));
333 line_count += 1;
334 }
335 }
336 total = line_count + rdr.records().count() + 1;
337 } else {
338 if let Ok(mut count_rdr) = ReaderBuilder::new().from_path(&path_data.path()) {
340 total = count_rdr.records().count();
341 }
342 if let Some(save_method) = save_opt {
344 let (tx, mut rx) = mpsc::channel(32);
345 let opts = Arc::new(opts.clone()); let headers = headers.clone(); tokio::spawn(async move {
348 for result in rdr.records() {
349 if let Some(row) = csv_row_result_to_values(result, Arc::new(&opts.rows)) {
350 let row_map = to_index_map(&row, &headers);
351 if tx.send(row_map).await.is_err() {
352 break;
354 }
355 }
356 }
357 });
358
359 while let Some(row) = rx.recv().await {
361 save_method(row)?;
362 }
363 }
364 }
365 let info = WorkbookInfo::simple(path_data);
366 let ds = DataSet::from_count_and_rows(total, rows, opts);
367 Ok(ResultSet::new(&info, &headers, ds, opts, out_ref))
368 } else {
369 let error_msg = match path_data.ext() {
370 Extension::Tsv => "unreadable_tsv_file",
371 _ => "unreadable_csv_file"
372 };
373 Err(GenericError(error_msg))
374 }
375}
376
377fn workbook_row_to_map(row: &[Data], opts: &RowOptionSet, headers: &[String]) -> IndexMap<String, Value> {
379 to_index_map(&workbook_row_to_values(row, &opts), headers)
380}
381
382fn workbook_row_to_values(row: &[Data], opts: &RowOptionSet) -> Vec<Value> {
384 let mut c_index = 0;
385 let mut cells: Vec<Value> = vec![];
386 for cell in row {
387 let value = workbook_cell_to_value(cell, Arc::new(opts), c_index);
388 cells.push(value);
389 c_index += 1;
390 }
391 cells
392}
393
394fn workbook_cell_to_value(cell: &Data, opts: Arc<&RowOptionSet>, c_index: usize) -> Value {
396 let col = opts.column(c_index);
397 let format = col.map_or(Format::Auto, |c| c.format.to_owned());
398 let def_val = col.and_then(|c| c.default.clone());
399
400 match cell {
401 Data::Int(i) => Value::Number(Number::from_i128(*i as i128).unwrap()),
402 Data::Float(f) => process_float_value(*f, format),
403 Data::DateTimeIso(d) => process_iso_datetime_value(d, def_val, opts.date_only),
404 Data::DateTime(d) => process_excel_datetime_value(d, def_val, opts.date_only),
405 Data::Bool(b) => Value::Bool(*b),
406 Data::String(s) => process_string_value(s, format, def_val),
407 Data::Empty => def_val.unwrap_or(Value::Null),
408 _ => Value::String(cell.to_string()),
409 }
410}
411
412fn process_float_value(value: f64, format: Format) -> Value {
413 match format {
414 Format::Integer => Value::Number(Number::from_i128(value as i128).unwrap()),
415 Format::Boolean => Value::Bool(value >= 1.0),
416 Format::Text => Value::String(value.to_string()),
417 _ => Value::Number(Number::from_f64(value).unwrap()),
418 }
419}
420
421fn process_excel_datetime_value(
422 datetime: &calamine::ExcelDateTime,
423 def_val: Option<Value>,
424 date_only: bool
425) -> Value {
426 let dt_ref = datetime.as_datetime().map_or_else(
427 || def_val.unwrap_or(Value::Null),
428 |dt| {
429 let formatted_date = if date_only {
430 dt.format("%Y-%m-%d").to_string()
431 } else {
432 dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
433 };
434 Value::String(formatted_date)
435 }
436 );
437 dt_ref
438}
439
440fn process_iso_datetime_value(
441 dt_str: &str,
442 def_val: Option<Value>,
443 date_only: bool
444) -> Value {
445 if date_only {
446 iso_fuzzy_to_date_string(dt_str).map_or_else(
447 || def_val.unwrap_or(Value::Null),
448 |dt| Value::String(dt)
449 )
450 } else {
451 iso_fuzzy_to_datetime_string(dt_str).map_or_else(
452 || def_val.unwrap_or(Value::Null),
453 |dt| Value::String(dt)
454 )
455 }
456}
457
458fn process_string_value(value: &str, format: Format, def_val: Option<Value>) -> Value {
459 match format {
460 Format::Boolean => process_truthy_value(value, def_val, is_truthy_core),
461 Format::Truthy => process_truthy_value(value, def_val, is_truthy_standard),
462 Format::TruthyCustom(opts) => process_truthy_value(value, def_val, |v, _| is_truthy_custom(v, &opts, false, false)),
463 Format::Decimal(places) => process_numeric_value(value, def_val, |n| float_value(n.round_decimal(places))),
464 Format::Float => process_numeric_value(value, def_val, float_value),
465 Format::Date => process_date_value(value, def_val, iso_fuzzy_to_date_string),
466 Format::DateTime => process_date_value(value, def_val, iso_fuzzy_to_datetime_string),
467 _ => Value::String(value.to_owned()),
468 }
469}
470
471fn process_truthy_value<F>(value: &str, def_val: Option<Value>, truthy_fn: F) -> Value
472where
473 F: Fn(&str, bool) -> Option<bool>,
474{
475 if let Some(is_true) = truthy_fn(value, false) {
476 Value::Bool(is_true)
477 } else {
478 def_val.unwrap_or(Value::Null)
479 }
480}
481
482fn process_numeric_value<F>(value: &str, def_val: Option<Value>, numeric_fn: F) -> Value
483where
484 F: Fn(f64) -> Value,
485{
486 if let Some(n) = value.to_first_number::<f64>() {
487 numeric_fn(n)
488 } else {
489 def_val.unwrap_or(Value::Null)
490 }
491}
492
493fn process_date_value<F>(value: &str, def_val: Option<Value>, date_fn: F) -> Value
494where
495 F: Fn(&str) -> Option<String>,
496{
497 if let Some(date_str) = date_fn(value) {
498 string_value(&date_str)
499 } else {
500 def_val.unwrap_or(Value::Null)
501 }
502}
503
504fn csv_row_result_to_values(result: Result<StringRecord, csv::Error>, opts: Arc<&RowOptionSet>) -> Option<Vec<Value>> {
506 if let Ok(record) = result {
507 let mut row: Vec<Value> = vec![];
508 let mut ci: usize = 0;
509 for cell in record.into_iter() {
510 let new_cell = csv_cell_to_json_value(cell, opts.clone(), ci);
511 row.push(new_cell);
512 ci += 1;
513 }
514 return Some(row)
515 }
516 None
517}
518
519fn csv_cell_to_json_value(cell: &str, opts: Arc<&RowOptionSet>, index: usize) -> Value {
521 let has_number = cell.to_first_number::<f64>().is_some();
522 let col = opts.column(index);
524 let fmt = if let Some(c) = col.cloned() {
525 c.format
526 } else {
527 Format::Auto
528 };
529 let euro_num_mode = if let Some(c) = col.cloned() {
530 c.decimal_comma
531 } else {
532 opts.decimal_comma
533 };
534 let num_cell = if has_number {
535 let euro_num_mode = is_euro_number_format(cell, euro_num_mode);
536 if euro_num_mode {
537 cell.replace(",", ".").replace(",", ".")
538 } else {
539 cell.replace(",", "")
540 }
541 } else {
542 cell.to_owned()
543 };
544 let mut new_cell = Value::Null;
545 if num_cell.len() > 0 && num_cell.is_numeric() {
546 if let Ok(float_val) = serde_json::Number::from_str(&num_cell) {
547 match fmt {
548 Format::Integer => {
549 if let Some(int_val) = Number::from_i128(float_val.as_i128().unwrap_or(0)) {
550 new_cell = Value::Number(int_val);
551 }
552 },
553 Format::Boolean => {
554 new_cell = Value::Bool(float_val.as_f64().unwrap_or(0f64) >= 1.0);
556 },
557 _ => {
558 new_cell = Value::Number(float_val);
559 }
560 }
561 }
562 } else if let Some(is_true) = is_truthy_core(cell, false) {
563 new_cell = Value::Bool(is_true);
564 } else {
565 new_cell = match fmt {
566 Format::Truthy => {
567 if let Some(is_true) = is_truthy_standard(cell, false) {
568 Value::Bool(is_true)
569 } else {
570 Value::Null
571 }
572 }
573 _ => Value::String(cell.to_string())
574 };
575 }
576 new_cell
577}
578
579
580
581#[cfg(test)]
582mod tests {
583 use serde_json::json;
584
585use crate::{helpers::*, Column};
586
587use super::*;
588
589 #[test]
590 fn test_direct_processing_xlsx() {
591 let sample_path = "data/sample-data-1.xlsx";
592
593 let opts = OptionSet::new(sample_path).max_row_count(1_000);
596
597 let result = process_spreadsheet_direct(&opts);
598
599 assert_eq!(result.unwrap().num_rows,401);
601 }
602
603 #[test]
604 fn test_direct_processing_csv() {
605 let sample_path = "data/sample-data-1.csv";
606
607 let opts = OptionSet::new(sample_path).max_row_count(1_000);
610
611 let result = process_spreadsheet_direct(&opts);
612
613 assert_eq!(result.unwrap().num_rows,401);
615 }
616
617 #[test]
618 fn test_multisheet_preview_ods() {
619 let sample_path = "data/sample-data-2.ods";
620
621 let opts = OptionSet::new(sample_path)
626 .max_row_count(10)
627 .read_mode_preview();
628
629 let result = process_spreadsheet_direct(&opts);
630
631 let dataset = result.unwrap();
633 assert_eq!(dataset.sheets.len(),2);
634 assert_eq!(dataset.num_rows,118);
636
637 assert_eq!(dataset.data.first_sheet().len(), 10);
639 }
640
641 #[test]
642 fn test_column_override_1() {
643 let sample_json = json!({
644 "sku": "CHAIR16",
645 "height": "112cm",
646 "width": "69cm",
647 "approved": "Y"
648 });
649
650 let rows = json_object_to_calamine_data(sample_json);
651
652 let cols = vec![
653 Column::new_format(Format::Text, Some(string_value(""))),
654 Column::new_format(Format::Float, Some(float_value(95.0))),
655 Column::new_format(Format::Float, Some(float_value(65.0))),
656 Column::new_format(Format::Truthy, Some(bool_value(false))),
657 ];
658
659 let opts = &RowOptionSet::simple(&cols);
661 let result = workbook_row_to_values(&rows, opts);
662 assert_eq!(result.get(1).unwrap(), 112.0);
664 assert_eq!(result.get(2).unwrap(), 69.0);
666 assert_eq!(result.get(3).unwrap(), true);
668 }
669
670
671 #[test]
672 fn test_column_override_2() {
673 let sample_json = json!({
674 "name": "Sophia",
675 "dob": "2001-9-23",
676 "weight": "62kg",
677 "result": "GOOD"
678 });
679
680 let rows = json_object_to_calamine_data(sample_json);
681
682 let cols = vec![
683 Column::new_format(Format::Text, None),
684 Column::new_format(Format::Date, None),
685 Column::new_format(Format::Float, None),
686 Column::new_format(Format::truthy_custom("good", "bad"), Some(bool_value(false))),
688 ];
689
690 let opts = &RowOptionSet::simple(&cols);
692 let result = workbook_row_to_values(&rows, opts);
693 assert_eq!(result.get(1).unwrap(), "2001-09-23");
694 assert_eq!(result.get(2).unwrap(), 62.0);
695 assert_eq!(result.get(3).unwrap(), true);
696 }
697
698}