1use std::collections::BTreeMap;
7use std::str::FromStr;
8
9use base64::Engine;
10use chrono::DateTime;
11use csv_async::{AsyncReaderBuilder, StringRecord, Trim};
12use futures::StreamExt;
13use go_parse_duration::parse_duration;
14use ordered_float::OrderedFloat;
15use tokio::io::AsyncRead;
16
17use crate::error::{Error, Result};
18use crate::types::{DataType, FluxRecord, FluxTableMetadata};
19use crate::value::Value;
20
21#[derive(PartialEq, Clone, Copy)]
31enum ParsingState {
32 Normal,
34 Annotation,
36 Error,
38}
39
40enum RowAction {
42 Continue,
44 Record(FluxRecord),
46 Error(Error),
48}
49
50pub struct AnnotatedCsvParser<R: AsyncRead + Unpin> {
72 csv: csv_async::AsyncReader<R>,
73 table_position: i32,
74 table: Option<FluxTableMetadata>,
75 parsing_state: ParsingState,
76 data_type_annotation_found: bool,
77}
78
79impl<R: AsyncRead + Unpin + Send> AnnotatedCsvParser<R> {
80 pub fn new(reader: R) -> Self {
82 let csv = AsyncReaderBuilder::new()
83 .has_headers(false) .trim(Trim::Fields)
85 .flexible(true)
86 .create_reader(reader);
87
88 Self {
89 csv,
90 table_position: 0,
91 table: None,
92 parsing_state: ParsingState::Normal,
93 data_type_annotation_found: false,
94 }
95 }
96
97 pub async fn next(&mut self) -> Result<Option<FluxRecord>> {
104 let mut records = self.csv.records();
105
106 loop {
107 let row = match records.next().await {
108 Some(Ok(r)) => r,
109 Some(Err(e)) => return Err(Error::Csv(format!("CSV read error: {}", e))),
110 None => return Ok(None), };
112
113 if row.len() <= 1 {
115 continue;
116 }
117
118 if detect_annotation_start(
120 &row,
121 self.parsing_state,
122 &mut self.table,
123 &mut self.table_position,
124 &mut self.parsing_state,
125 &mut self.data_type_annotation_found,
126 ) {
127 }
129
130 let table = match &mut self.table {
132 Some(t) => t,
133 None => {
134 return Err(Error::MissingAnnotation(
135 "No annotations found before data".to_string(),
136 ));
137 }
138 };
139
140 if row.len() - 1 != table.columns.len() {
142 return Err(Error::ColumnMismatch {
143 expected: table.columns.len(),
144 actual: row.len() - 1,
145 });
146 }
147
148 let action = process_row(
150 &row,
151 table,
152 self.parsing_state,
153 self.data_type_annotation_found,
154 &mut self.parsing_state,
155 &mut self.data_type_annotation_found,
156 )?;
157
158 match action {
159 RowAction::Continue => continue,
160 RowAction::Record(record) => return Ok(Some(record)),
161 RowAction::Error(e) => return Err(e),
162 }
163 }
164 }
165}
166
167fn detect_annotation_start(
170 row: &StringRecord,
171 current_state: ParsingState,
172 table: &mut Option<FluxTableMetadata>,
173 table_position: &mut i32,
174 parsing_state: &mut ParsingState,
175 data_type_annotation_found: &mut bool,
176) -> bool {
177 if let Some(first) = row.get(0) {
178 if !first.is_empty() && first.starts_with('#') && current_state == ParsingState::Normal {
179 *table = Some(FluxTableMetadata::new(*table_position, row.len() - 1));
181 *table_position += 1;
182 *parsing_state = ParsingState::Annotation;
183 *data_type_annotation_found = false;
184 return true;
185 }
186 }
187 false
188}
189
190fn process_row(
192 row: &StringRecord,
193 table: &mut FluxTableMetadata,
194 current_state: ParsingState,
195 current_datatype_found: bool,
196 parsing_state: &mut ParsingState,
197 data_type_annotation_found: &mut bool,
198) -> Result<RowAction> {
199 let first_cell = row.get(0).unwrap_or_default();
200
201 match first_cell {
202 "" => process_empty_first_cell(
203 row,
204 table,
205 current_state,
206 current_datatype_found,
207 parsing_state,
208 ),
209 "#datatype" => {
210 process_datatype_annotation(row, table, data_type_annotation_found)?;
211 Ok(RowAction::Continue)
212 }
213 "#group" => {
214 process_group_annotation(row, table);
215 Ok(RowAction::Continue)
216 }
217 "#default" => {
218 process_default_annotation(row, table);
219 Ok(RowAction::Continue)
220 }
221 other => Err(Error::Parse {
222 message: format!("Invalid first cell: {}", other),
223 }),
224 }
225}
226
227fn process_empty_first_cell(
229 row: &StringRecord,
230 table: &mut FluxTableMetadata,
231 current_state: ParsingState,
232 data_type_annotation_found: bool,
233 parsing_state: &mut ParsingState,
234) -> Result<RowAction> {
235 match current_state {
236 ParsingState::Annotation => {
237 process_header_row(row, table, data_type_annotation_found, parsing_state)
238 }
239 ParsingState::Error => Ok(RowAction::Error(parse_error_response(row))),
240 ParsingState::Normal => parse_data_row(row, table),
241 }
242}
243
244fn process_header_row(
246 row: &StringRecord,
247 table: &mut FluxTableMetadata,
248 data_type_annotation_found: bool,
249 parsing_state: &mut ParsingState,
250) -> Result<RowAction> {
251 if !data_type_annotation_found {
252 return Err(Error::MissingAnnotation(
253 "#datatype annotation not found".to_string(),
254 ));
255 }
256
257 if row.get(1).unwrap_or_default() == "error" {
259 *parsing_state = ParsingState::Error;
260 return Ok(RowAction::Continue);
261 }
262
263 for i in 1..row.len() {
265 if let Some(name) = row.get(i) {
266 table.columns[i - 1].name = name.to_string();
267 }
268 }
269 *parsing_state = ParsingState::Normal;
270
271 Ok(RowAction::Continue)
272}
273
274fn parse_error_response(row: &StringRecord) -> Error {
276 let message = row
277 .get(1)
278 .filter(|s| !s.is_empty())
279 .map(|s| s.to_string())
280 .unwrap_or_else(|| "Unknown query error".to_string());
281
282 let reference = row.get(2).filter(|s| !s.is_empty()).map(|s| s.to_string());
283
284 Error::QueryError { message, reference }
285}
286
287fn parse_data_row(row: &StringRecord, table: &FluxTableMetadata) -> Result<RowAction> {
289 let mut values = BTreeMap::new();
290
291 for i in 1..row.len() {
292 let col = &table.columns[i - 1];
293 let raw_value = row.get(i).unwrap_or_default();
294 let value = if raw_value.is_empty() {
295 &col.default_value
296 } else {
297 raw_value
298 };
299
300 let parsed = parse_value(value, col.data_type, &col.name)?;
301 values.insert(col.name.clone(), parsed);
302 }
303
304 Ok(RowAction::Record(FluxRecord {
305 table: table.position,
306 values,
307 }))
308}
309
310fn process_datatype_annotation(
312 row: &StringRecord,
313 table: &mut FluxTableMetadata,
314 data_type_annotation_found: &mut bool,
315) -> Result<()> {
316 *data_type_annotation_found = true;
317
318 for i in 1..row.len() {
319 if let Some(type_str) = row.get(i) {
320 let dt = DataType::from_str(type_str)?;
321 table.columns[i - 1].data_type = dt;
322 }
323 }
324
325 Ok(())
326}
327
328fn process_group_annotation(row: &StringRecord, table: &mut FluxTableMetadata) {
330 for i in 1..row.len() {
331 if let Some(value) = row.get(i) {
332 table.columns[i - 1].group = value == "true";
333 }
334 }
335}
336
337fn process_default_annotation(row: &StringRecord, table: &mut FluxTableMetadata) {
339 for i in 1..row.len() {
340 if let Some(value) = row.get(i) {
341 table.columns[i - 1].default_value = value.to_string();
342 }
343 }
344}
345
346fn parse_value(s: &str, data_type: DataType, column_name: &str) -> Result<Value> {
348 if s.is_empty() && data_type != DataType::String {
350 return Ok(Value::Null);
351 }
352
353 match data_type {
354 DataType::String => Ok(Value::String(s.to_string())),
355 DataType::Double => {
356 let v = s.parse::<f64>().map_err(|e| Error::Parse {
357 message: format!("Invalid double '{}' for column '{}': {}", s, column_name, e),
358 })?;
359 Ok(Value::Double(OrderedFloat::from(v)))
360 }
361 DataType::Bool => {
362 let v = s.to_lowercase() != "false";
363 Ok(Value::Bool(v))
364 }
365 DataType::Long => {
366 let v = s.parse::<i64>().map_err(|e| Error::Parse {
367 message: format!("Invalid long '{}' for column '{}': {}", s, column_name, e),
368 })?;
369 Ok(Value::Long(v))
370 }
371 DataType::UnsignedLong => {
372 let v = s.parse::<u64>().map_err(|e| Error::Parse {
373 message: format!(
374 "Invalid unsignedLong '{}' for column '{}': {}",
375 s, column_name, e
376 ),
377 })?;
378 Ok(Value::UnsignedLong(v))
379 }
380 DataType::Duration => {
381 let nanos = parse_duration(s).map_err(|_| Error::Parse {
382 message: format!("Invalid duration '{}' for column '{}'", s, column_name),
383 })?;
384 Ok(Value::Duration(chrono::Duration::nanoseconds(nanos)))
385 }
386 DataType::Base64Binary => {
387 let bytes = base64::engine::general_purpose::STANDARD
388 .decode(s)
389 .map_err(|e| Error::Parse {
390 message: format!("Invalid base64 '{}' for column '{}': {}", s, column_name, e),
391 })?;
392 Ok(Value::Base64Binary(bytes))
393 }
394 DataType::TimeRFC => {
395 let t = DateTime::parse_from_rfc3339(s).map_err(|e| Error::Parse {
396 message: format!(
397 "Invalid RFC3339 timestamp '{}' for column '{}': {}",
398 s, column_name, e
399 ),
400 })?;
401 Ok(Value::TimeRFC(t))
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use chrono::{Datelike, Timelike};
410 use std::io::Cursor;
411
412 #[test]
417 fn test_parse_value_string() {
418 let v = parse_value("hello", DataType::String, "test").unwrap();
419 assert_eq!(v, Value::String("hello".to_string()));
420 }
421
422 #[test]
423 fn test_parse_value_string_empty() {
424 let v = parse_value("", DataType::String, "test").unwrap();
426 assert_eq!(v, Value::String("".to_string()));
427 }
428
429 #[test]
430 fn test_parse_value_double() {
431 let v = parse_value("2.72", DataType::Double, "test").unwrap();
432 assert_eq!(v, Value::Double(OrderedFloat::from(2.72)));
433 }
434
435 #[test]
436 fn test_parse_value_double_negative() {
437 let v = parse_value("-123.456", DataType::Double, "test").unwrap();
438 assert_eq!(v, Value::Double(OrderedFloat::from(-123.456)));
439 }
440
441 #[test]
442 fn test_parse_value_double_scientific() {
443 let v = parse_value("1.5e10", DataType::Double, "test").unwrap();
444 assert_eq!(v, Value::Double(OrderedFloat::from(1.5e10)));
445 }
446
447 #[test]
448 fn test_parse_value_bool() {
449 assert_eq!(
450 parse_value("true", DataType::Bool, "test").unwrap(),
451 Value::Bool(true)
452 );
453 assert_eq!(
454 parse_value("false", DataType::Bool, "test").unwrap(),
455 Value::Bool(false)
456 );
457 }
458
459 #[test]
460 fn test_parse_value_bool_case_insensitive() {
461 assert_eq!(
462 parse_value("TRUE", DataType::Bool, "test").unwrap(),
463 Value::Bool(true)
464 );
465 assert_eq!(
466 parse_value("FALSE", DataType::Bool, "test").unwrap(),
467 Value::Bool(false)
468 );
469 assert_eq!(
470 parse_value("False", DataType::Bool, "test").unwrap(),
471 Value::Bool(false)
472 );
473 }
474
475 #[test]
476 fn test_parse_value_long() {
477 let v = parse_value("-42", DataType::Long, "test").unwrap();
478 assert_eq!(v, Value::Long(-42));
479 }
480
481 #[test]
482 fn test_parse_value_long_max() {
483 let v = parse_value("9223372036854775807", DataType::Long, "test").unwrap();
484 assert_eq!(v, Value::Long(i64::MAX));
485 }
486
487 #[test]
488 fn test_parse_value_long_min() {
489 let v = parse_value("-9223372036854775808", DataType::Long, "test").unwrap();
490 assert_eq!(v, Value::Long(i64::MIN));
491 }
492
493 #[test]
494 fn test_parse_value_unsigned_long() {
495 let v = parse_value("42", DataType::UnsignedLong, "test").unwrap();
496 assert_eq!(v, Value::UnsignedLong(42));
497 }
498
499 #[test]
500 fn test_parse_value_unsigned_long_max() {
501 let v = parse_value("18446744073709551615", DataType::UnsignedLong, "test").unwrap();
502 assert_eq!(v, Value::UnsignedLong(u64::MAX));
503 }
504
505 #[test]
506 fn test_parse_value_duration() {
507 let v = parse_value("1h30m", DataType::Duration, "test").unwrap();
508 let expected = chrono::Duration::nanoseconds(5_400_000_000_000); assert_eq!(v, Value::Duration(expected));
510 }
511
512 #[test]
513 fn test_parse_value_duration_nanoseconds() {
514 let v = parse_value("100ns", DataType::Duration, "test").unwrap();
515 let expected = chrono::Duration::nanoseconds(100);
516 assert_eq!(v, Value::Duration(expected));
517 }
518
519 #[test]
520 fn test_parse_value_duration_complex() {
521 let v = parse_value("2h45m30s", DataType::Duration, "test").unwrap();
522 let expected = chrono::Duration::nanoseconds(9_930_000_000_000);
524 assert_eq!(v, Value::Duration(expected));
525 }
526
527 #[test]
528 fn test_parse_value_base64() {
529 let v = parse_value("SGVsbG8gV29ybGQ=", DataType::Base64Binary, "test").unwrap();
530 assert_eq!(v, Value::Base64Binary(b"Hello World".to_vec()));
531 }
532
533 #[test]
534 fn test_parse_value_base64_empty() {
535 let v = parse_value("", DataType::Base64Binary, "test").unwrap();
536 assert_eq!(v, Value::Null);
537 }
538
539 #[test]
540 fn test_parse_value_time_rfc3339() {
541 let v = parse_value("2023-11-14T12:30:45Z", DataType::TimeRFC, "test").unwrap();
542 if let Value::TimeRFC(dt) = v {
543 assert_eq!(dt.year(), 2023);
544 assert_eq!(dt.month(), 11);
545 assert_eq!(dt.day(), 14);
546 assert_eq!(dt.hour(), 12);
547 assert_eq!(dt.minute(), 30);
548 assert_eq!(dt.second(), 45);
549 } else {
550 panic!("Expected TimeRFC value");
551 }
552 }
553
554 #[test]
555 fn test_parse_value_time_rfc3339_with_timezone() {
556 let v = parse_value("2023-11-14T12:30:45+09:00", DataType::TimeRFC, "test").unwrap();
557 if let Value::TimeRFC(dt) = v {
558 assert_eq!(dt.year(), 2023);
559 assert_eq!(dt.offset().local_minus_utc(), 9 * 3600);
560 } else {
561 panic!("Expected TimeRFC value");
562 }
563 }
564
565 #[test]
566 fn test_parse_value_time_rfc3339_nano() {
567 let v = parse_value("2023-11-14T12:30:45.123456789Z", DataType::TimeRFC, "test").unwrap();
568 if let Value::TimeRFC(dt) = v {
569 assert_eq!(dt.nanosecond(), 123456789);
570 } else {
571 panic!("Expected TimeRFC value");
572 }
573 }
574
575 #[test]
576 fn test_parse_value_empty_is_null() {
577 let v = parse_value("", DataType::Long, "test").unwrap();
578 assert_eq!(v, Value::Null);
579 }
580
581 #[test]
582 fn test_parse_value_empty_is_null_for_all_non_string_types() {
583 assert_eq!(
584 parse_value("", DataType::Double, "test").unwrap(),
585 Value::Null
586 );
587 assert_eq!(
588 parse_value("", DataType::Long, "test").unwrap(),
589 Value::Null
590 );
591 assert_eq!(
592 parse_value("", DataType::UnsignedLong, "test").unwrap(),
593 Value::Null
594 );
595 assert_eq!(
596 parse_value("", DataType::Bool, "test").unwrap(),
597 Value::Null
598 );
599 assert_eq!(
600 parse_value("", DataType::Duration, "test").unwrap(),
601 Value::Null
602 );
603 assert_eq!(
604 parse_value("", DataType::Base64Binary, "test").unwrap(),
605 Value::Null
606 );
607 assert_eq!(
608 parse_value("", DataType::TimeRFC, "test").unwrap(),
609 Value::Null
610 );
611 }
612
613 #[test]
618 fn test_parse_value_invalid_double() {
619 let result = parse_value("not_a_number", DataType::Double, "test");
620 assert!(result.is_err());
621 let err = result.unwrap_err();
622 assert!(matches!(err, Error::Parse { .. }));
623 }
624
625 #[test]
626 fn test_parse_value_invalid_long() {
627 let result = parse_value("12.5", DataType::Long, "test");
628 assert!(result.is_err());
629 }
630
631 #[test]
632 fn test_parse_value_invalid_long_overflow() {
633 let result = parse_value("9999999999999999999999", DataType::Long, "test");
634 assert!(result.is_err());
635 }
636
637 #[test]
638 fn test_parse_value_invalid_unsigned_long_negative() {
639 let result = parse_value("-1", DataType::UnsignedLong, "test");
640 assert!(result.is_err());
641 }
642
643 #[test]
644 fn test_parse_value_invalid_duration() {
645 let result = parse_value("not_a_duration", DataType::Duration, "test");
646 assert!(result.is_err());
647 }
648
649 #[test]
650 fn test_parse_value_invalid_base64() {
651 let result = parse_value("!!invalid!!", DataType::Base64Binary, "test");
652 assert!(result.is_err());
653 }
654
655 #[test]
656 fn test_parse_value_invalid_time() {
657 let result = parse_value("not-a-timestamp", DataType::TimeRFC, "test");
658 assert!(result.is_err());
659 }
660
661 #[test]
662 fn test_parse_value_invalid_time_format() {
663 let result = parse_value("2023/11/14 12:30:45", DataType::TimeRFC, "test");
665 assert!(result.is_err());
666 }
667
668 fn parser_from_str(s: &str) -> AnnotatedCsvParser<Cursor<Vec<u8>>> {
674 AnnotatedCsvParser::new(Cursor::new(s.as_bytes().to_vec()))
675 }
676
677 #[tokio::test]
678 async fn test_parser_basic_csv() {
679 let csv = r#"#datatype,string,long,double
680#group,false,false,false
681#default,,0,0.0
682,name,count,value
683,alice,10,1.5
684,bob,20,2.5
685"#;
686 let mut parser = parser_from_str(csv);
687
688 let record1 = parser.next().await.unwrap().unwrap();
689 assert_eq!(record1.get_string("name"), Some("alice".to_string()));
690 assert_eq!(record1.get_long("count"), Some(10));
691 assert_eq!(record1.get_double("value"), Some(1.5));
692
693 let record2 = parser.next().await.unwrap().unwrap();
694 assert_eq!(record2.get_string("name"), Some("bob".to_string()));
695 assert_eq!(record2.get_long("count"), Some(20));
696 assert_eq!(record2.get_double("value"), Some(2.5));
697
698 assert!(parser.next().await.unwrap().is_none());
700 }
701
702 #[tokio::test]
703 async fn test_parser_empty_input() {
704 let csv = "";
705 let mut parser = parser_from_str(csv);
706
707 assert!(parser.next().await.unwrap().is_none());
709 }
710
711 #[tokio::test]
712 async fn test_parser_empty_result_set() {
713 let csv = r#"#datatype,string,long
715#group,false,false
716#default,,
717,name,value
718"#;
719 let mut parser = parser_from_str(csv);
720
721 assert!(parser.next().await.unwrap().is_none());
723 }
724
725 #[tokio::test]
726 async fn test_parser_missing_datatype_annotation() {
727 let csv = r#"#group,false,false
728#default,,
729,name,value
730,alice,10
731"#;
732 let mut parser = parser_from_str(csv);
733
734 let result = parser.next().await;
735 assert!(result.is_err());
736 assert!(matches!(result.unwrap_err(), Error::MissingAnnotation(_)));
737 }
738
739 #[tokio::test]
740 async fn test_parser_column_count_mismatch() {
741 let csv = r#"#datatype,string,long
742#group,false,false
743#default,,
744,name,value
745,alice,10,extra_column
746"#;
747 let mut parser = parser_from_str(csv);
748
749 let result = parser.next().await;
750 assert!(result.is_err());
751 assert!(matches!(result.unwrap_err(), Error::ColumnMismatch { .. }));
752 }
753
754 #[tokio::test]
755 async fn test_parser_influxdb_error_response() {
756 let csv = r#"#datatype,string,string
757#group,true,true
758#default,,
759,error,reference
760,bucket not found,some-reference-id
761"#;
762 let mut parser = parser_from_str(csv);
763
764 let result = parser.next().await;
765 assert!(result.is_err());
766 if let Error::QueryError { message, reference } = result.unwrap_err() {
767 assert_eq!(message, "bucket not found");
768 assert_eq!(reference, Some("some-reference-id".to_string()));
769 } else {
770 panic!("Expected QueryError");
771 }
772 }
773
774 #[tokio::test]
775 async fn test_parser_influxdb_error_response_no_reference() {
776 let csv = r#"#datatype,string,string
777#group,true,true
778#default,,
779,error,reference
780,query syntax error,
781"#;
782 let mut parser = parser_from_str(csv);
783
784 let result = parser.next().await;
785 assert!(result.is_err());
786 if let Error::QueryError { message, reference } = result.unwrap_err() {
787 assert_eq!(message, "query syntax error");
788 assert!(reference.is_none());
789 } else {
790 panic!("Expected QueryError");
791 }
792 }
793
794 #[tokio::test]
795 async fn test_parser_multiple_tables() {
796 let csv = r#"#datatype,string,long
797#group,false,false
798#default,,
799,name,value
800,alice,10
801
802#datatype,string,double
803#group,false,false
804#default,,
805,name,score
806,bob,95.5
807"#;
808 let mut parser = parser_from_str(csv);
809
810 let record1 = parser.next().await.unwrap().unwrap();
811 assert_eq!(record1.table, 0);
812 assert_eq!(record1.get_string("name"), Some("alice".to_string()));
813 assert_eq!(record1.get_long("value"), Some(10));
814
815 let record2 = parser.next().await.unwrap().unwrap();
816 assert_eq!(record2.table, 1);
817 assert_eq!(record2.get_string("name"), Some("bob".to_string()));
818 assert_eq!(record2.get_double("score"), Some(95.5));
819
820 assert!(parser.next().await.unwrap().is_none());
821 }
822
823 #[tokio::test]
824 async fn test_parser_default_values() {
825 let csv = r#"#datatype,string,long,double
826#group,false,false,false
827#default,unknown,0,1.0
828,name,count,value
829,alice,,
830"#;
831 let mut parser = parser_from_str(csv);
832
833 let record = parser.next().await.unwrap().unwrap();
834 assert_eq!(record.get_string("name"), Some("alice".to_string()));
835 assert_eq!(record.get_long("count"), Some(0));
837 assert_eq!(record.get_double("value"), Some(1.0));
838 }
839
840 #[tokio::test]
841 async fn test_parser_group_annotation() {
842 let csv = r#"#datatype,string,string,long
843#group,true,false,false
844#default,,,
845,_measurement,host,value
846,cpu,server1,100
847"#;
848 let mut parser = parser_from_str(csv);
849
850 let record = parser.next().await.unwrap().unwrap();
851 assert_eq!(record.get_string("_measurement"), Some("cpu".to_string()));
852 assert_eq!(record.get_string("host"), Some("server1".to_string()));
853 assert_eq!(record.get_long("value"), Some(100));
854 }
855
856 #[tokio::test]
857 async fn test_parser_all_data_types() {
858 let csv = r#"#datatype,string,long,unsignedLong,double,boolean,dateTime:RFC3339
859#group,false,false,false,false,false,false
860#default,,,,,,
861,str,lng,ulng,dbl,bl,ts
862,hello,-42,18446744073709551615,2.72,true,2023-11-14T12:00:00Z
863"#;
864 let mut parser = parser_from_str(csv);
865
866 let record = parser.next().await.unwrap().unwrap();
867 assert_eq!(record.get_string("str"), Some("hello".to_string()));
868 assert_eq!(record.get_long("lng"), Some(-42));
869 assert_eq!(
870 record.values.get("ulng").and_then(|v| v.as_unsigned_long()),
871 Some(u64::MAX)
872 );
873 assert_eq!(record.get_double("dbl"), Some(2.72));
874 assert_eq!(record.get_bool("bl"), Some(true));
875 assert!(record.values.get("ts").and_then(|v| v.as_time()).is_some());
876 }
877
878 #[tokio::test]
879 async fn test_parser_skips_empty_rows() {
880 let csv = r#"#datatype,string,long
881#group,false,false
882#default,,
883,name,value
884
885,alice,10
886
887,bob,20
888
889"#;
890 let mut parser = parser_from_str(csv);
891
892 let record1 = parser.next().await.unwrap().unwrap();
893 assert_eq!(record1.get_string("name"), Some("alice".to_string()));
894
895 let record2 = parser.next().await.unwrap().unwrap();
896 assert_eq!(record2.get_string("name"), Some("bob".to_string()));
897
898 assert!(parser.next().await.unwrap().is_none());
899 }
900
901 #[tokio::test]
902 async fn test_parser_invalid_first_cell() {
903 let csv = r#"#datatype,string,long
904#group,false,false
905#default,,
906,name,value
907invalid,alice,10
908"#;
909 let mut parser = parser_from_str(csv);
910
911 let result = parser.next().await;
912 assert!(result.is_err());
913 assert!(matches!(result.unwrap_err(), Error::Parse { .. }));
914 }
915
916 #[tokio::test]
917 async fn test_parser_unknown_datatype() {
918 let csv = r#"#datatype,string,unknown_type
919#group,false,false
920#default,,
921,name,value
922,alice,10
923"#;
924 let mut parser = parser_from_str(csv);
925
926 let result = parser.next().await;
927 assert!(result.is_err());
928 assert!(matches!(result.unwrap_err(), Error::UnknownDataType(_)));
929 }
930}