influxdb_stream/
parser.rs

1//! Async parser for InfluxDB annotated CSV format.
2//!
3//! This module provides a streaming parser for InfluxDB's annotated CSV format,
4//! which is the format returned by the `/api/v2/query` endpoint.
5
6use std::collections::BTreeMap;
7use std::str::FromStr;
8
9use base64::Engine;
10use chrono::DateTime;
11use csv_async::{AsyncReaderBuilder, StringRecord, Trim};
12use futures::StreamExt;
13use go_parse_duration::parse_duration;
14use ordered_float::OrderedFloat;
15use tokio::io::AsyncRead;
16
17use crate::error::{Error, Result};
18use crate::types::{DataType, FluxRecord, FluxTableMetadata};
19use crate::value::Value;
20
21/// Internal state of the CSV parser.
22///
23/// State transitions:
24/// ```text
25/// Normal -> Annotation (when # row encountered)
26/// Annotation -> Normal (after header row)
27/// Annotation -> Error (when error table detected)
28/// Error -> (terminates with error)
29/// ```
30#[derive(PartialEq, Clone, Copy)]
31enum ParsingState {
32    /// Normal data rows.
33    Normal,
34    /// Processing annotation rows.
35    Annotation,
36    /// Error state (InfluxDB returned an error in the CSV).
37    Error,
38}
39
40/// Result of processing a single row.
41enum RowAction {
42    /// Continue to next row (annotation or header processed).
43    Continue,
44    /// Return a parsed record.
45    Record(FluxRecord),
46    /// Return an error.
47    Error(Error),
48}
49
50/// Async streaming parser for InfluxDB annotated CSV.
51///
52/// This parser reads an async byte stream and yields `FluxRecord`s one at a time,
53/// without loading the entire response into memory.
54///
55/// # Example
56///
57/// ```ignore
58/// use influxdb_stream::parser::AnnotatedCsvParser;
59/// use tokio::io::AsyncRead;
60///
61/// async fn parse<R: AsyncRead + Unpin + Send>(reader: R) {
62///     let mut parser = AnnotatedCsvParser::new(reader);
63///     while let Some(record) = parser.next().await.transpose() {
64///         match record {
65///             Ok(rec) => println!("Got record: {:?}", rec),
66///             Err(e) => eprintln!("Parse error: {}", e),
67///         }
68///     }
69/// }
70/// ```
71pub struct AnnotatedCsvParser<R: AsyncRead + Unpin> {
72    csv: csv_async::AsyncReader<R>,
73    table_position: i32,
74    table: Option<FluxTableMetadata>,
75    parsing_state: ParsingState,
76    data_type_annotation_found: bool,
77}
78
79impl<R: AsyncRead + Unpin + Send> AnnotatedCsvParser<R> {
80    /// Create a new parser from an async reader.
81    pub fn new(reader: R) -> Self {
82        let csv = AsyncReaderBuilder::new()
83            .has_headers(false) // We handle headers/annotations ourselves
84            .trim(Trim::Fields)
85            .flexible(true)
86            .create_reader(reader);
87
88        Self {
89            csv,
90            table_position: 0,
91            table: None,
92            parsing_state: ParsingState::Normal,
93            data_type_annotation_found: false,
94        }
95    }
96
97    /// Parse and return the next record.
98    ///
99    /// Returns:
100    /// - `Ok(Some(record))` - Successfully parsed a record
101    /// - `Ok(None)` - End of stream (EOF)
102    /// - `Err(e)` - Parse error
103    pub async fn next(&mut self) -> Result<Option<FluxRecord>> {
104        let mut records = self.csv.records();
105
106        loop {
107            let row = match records.next().await {
108                Some(Ok(r)) => r,
109                Some(Err(e)) => return Err(Error::Csv(format!("CSV read error: {}", e))),
110                None => return Ok(None), // EOF
111            };
112
113            // Skip empty rows or rows with only 1 column
114            if row.len() <= 1 {
115                continue;
116            }
117
118            // Detect start of new annotation block
119            if detect_annotation_start(
120                &row,
121                self.parsing_state,
122                &mut self.table,
123                &mut self.table_position,
124                &mut self.parsing_state,
125                &mut self.data_type_annotation_found,
126            ) {
127                // New table started, parsing_state is now Annotation
128            }
129
130            // Get table reference or return error if missing
131            let table = match &mut self.table {
132                Some(t) => t,
133                None => {
134                    return Err(Error::MissingAnnotation(
135                        "No annotations found before data".to_string(),
136                    ));
137                }
138            };
139
140            // Validate column count
141            if row.len() - 1 != table.columns.len() {
142                return Err(Error::ColumnMismatch {
143                    expected: table.columns.len(),
144                    actual: row.len() - 1,
145                });
146            }
147
148            // Process the row based on its first cell
149            let action = process_row(
150                &row,
151                table,
152                self.parsing_state,
153                self.data_type_annotation_found,
154                &mut self.parsing_state,
155                &mut self.data_type_annotation_found,
156            )?;
157
158            match action {
159                RowAction::Continue => continue,
160                RowAction::Record(record) => return Ok(Some(record)),
161                RowAction::Error(e) => return Err(e),
162            }
163        }
164    }
165}
166
167/// Detect if a row starts a new annotation block.
168/// Returns true if a new annotation block was started.
169fn detect_annotation_start(
170    row: &StringRecord,
171    current_state: ParsingState,
172    table: &mut Option<FluxTableMetadata>,
173    table_position: &mut i32,
174    parsing_state: &mut ParsingState,
175    data_type_annotation_found: &mut bool,
176) -> bool {
177    if let Some(first) = row.get(0) {
178        if !first.is_empty() && first.starts_with('#') && current_state == ParsingState::Normal {
179            // Start of a new table
180            *table = Some(FluxTableMetadata::new(*table_position, row.len() - 1));
181            *table_position += 1;
182            *parsing_state = ParsingState::Annotation;
183            *data_type_annotation_found = false;
184            return true;
185        }
186    }
187    false
188}
189
190/// Process a single row and return the appropriate action.
191fn process_row(
192    row: &StringRecord,
193    table: &mut FluxTableMetadata,
194    current_state: ParsingState,
195    current_datatype_found: bool,
196    parsing_state: &mut ParsingState,
197    data_type_annotation_found: &mut bool,
198) -> Result<RowAction> {
199    let first_cell = row.get(0).unwrap_or_default();
200
201    match first_cell {
202        "" => process_empty_first_cell(
203            row,
204            table,
205            current_state,
206            current_datatype_found,
207            parsing_state,
208        ),
209        "#datatype" => {
210            process_datatype_annotation(row, table, data_type_annotation_found)?;
211            Ok(RowAction::Continue)
212        }
213        "#group" => {
214            process_group_annotation(row, table);
215            Ok(RowAction::Continue)
216        }
217        "#default" => {
218            process_default_annotation(row, table);
219            Ok(RowAction::Continue)
220        }
221        other => Err(Error::Parse {
222            message: format!("Invalid first cell: {}", other),
223        }),
224    }
225}
226
227/// Process a row with empty first cell (header, data, or error row).
228fn process_empty_first_cell(
229    row: &StringRecord,
230    table: &mut FluxTableMetadata,
231    current_state: ParsingState,
232    data_type_annotation_found: bool,
233    parsing_state: &mut ParsingState,
234) -> Result<RowAction> {
235    match current_state {
236        ParsingState::Annotation => {
237            process_header_row(row, table, data_type_annotation_found, parsing_state)
238        }
239        ParsingState::Error => Ok(RowAction::Error(parse_error_response(row))),
240        ParsingState::Normal => parse_data_row(row, table),
241    }
242}
243
244/// Process the header row (first row after annotations with empty first cell).
245fn process_header_row(
246    row: &StringRecord,
247    table: &mut FluxTableMetadata,
248    data_type_annotation_found: bool,
249    parsing_state: &mut ParsingState,
250) -> Result<RowAction> {
251    if !data_type_annotation_found {
252        return Err(Error::MissingAnnotation(
253            "#datatype annotation not found".to_string(),
254        ));
255    }
256
257    // Check for error table
258    if row.get(1).unwrap_or_default() == "error" {
259        *parsing_state = ParsingState::Error;
260        return Ok(RowAction::Continue);
261    }
262
263    // Fill column names from header row
264    for i in 1..row.len() {
265        if let Some(name) = row.get(i) {
266            table.columns[i - 1].name = name.to_string();
267        }
268    }
269    *parsing_state = ParsingState::Normal;
270
271    Ok(RowAction::Continue)
272}
273
274/// Parse an error response from InfluxDB.
275fn parse_error_response(row: &StringRecord) -> Error {
276    let message = row
277        .get(1)
278        .filter(|s| !s.is_empty())
279        .map(|s| s.to_string())
280        .unwrap_or_else(|| "Unknown query error".to_string());
281
282    let reference = row.get(2).filter(|s| !s.is_empty()).map(|s| s.to_string());
283
284    Error::QueryError { message, reference }
285}
286
287/// Parse a data row into a FluxRecord.
288fn parse_data_row(row: &StringRecord, table: &FluxTableMetadata) -> Result<RowAction> {
289    let mut values = BTreeMap::new();
290
291    for i in 1..row.len() {
292        let col = &table.columns[i - 1];
293        let raw_value = row.get(i).unwrap_or_default();
294        let value = if raw_value.is_empty() {
295            &col.default_value
296        } else {
297            raw_value
298        };
299
300        let parsed = parse_value(value, col.data_type, &col.name)?;
301        values.insert(col.name.clone(), parsed);
302    }
303
304    Ok(RowAction::Record(FluxRecord {
305        table: table.position,
306        values,
307    }))
308}
309
310/// Process #datatype annotation row.
311fn process_datatype_annotation(
312    row: &StringRecord,
313    table: &mut FluxTableMetadata,
314    data_type_annotation_found: &mut bool,
315) -> Result<()> {
316    *data_type_annotation_found = true;
317
318    for i in 1..row.len() {
319        if let Some(type_str) = row.get(i) {
320            let dt = DataType::from_str(type_str)?;
321            table.columns[i - 1].data_type = dt;
322        }
323    }
324
325    Ok(())
326}
327
328/// Process #group annotation row.
329fn process_group_annotation(row: &StringRecord, table: &mut FluxTableMetadata) {
330    for i in 1..row.len() {
331        if let Some(value) = row.get(i) {
332            table.columns[i - 1].group = value == "true";
333        }
334    }
335}
336
337/// Process #default annotation row.
338fn process_default_annotation(row: &StringRecord, table: &mut FluxTableMetadata) {
339    for i in 1..row.len() {
340        if let Some(value) = row.get(i) {
341            table.columns[i - 1].default_value = value.to_string();
342        }
343    }
344}
345
346/// Parse a string value into a Value based on the data type.
347fn parse_value(s: &str, data_type: DataType, column_name: &str) -> Result<Value> {
348    // Handle empty strings as null for non-string types
349    if s.is_empty() && data_type != DataType::String {
350        return Ok(Value::Null);
351    }
352
353    match data_type {
354        DataType::String => Ok(Value::String(s.to_string())),
355        DataType::Double => {
356            let v = s.parse::<f64>().map_err(|e| Error::Parse {
357                message: format!("Invalid double '{}' for column '{}': {}", s, column_name, e),
358            })?;
359            Ok(Value::Double(OrderedFloat::from(v)))
360        }
361        DataType::Bool => {
362            let v = s.to_lowercase() != "false";
363            Ok(Value::Bool(v))
364        }
365        DataType::Long => {
366            let v = s.parse::<i64>().map_err(|e| Error::Parse {
367                message: format!("Invalid long '{}' for column '{}': {}", s, column_name, e),
368            })?;
369            Ok(Value::Long(v))
370        }
371        DataType::UnsignedLong => {
372            let v = s.parse::<u64>().map_err(|e| Error::Parse {
373                message: format!(
374                    "Invalid unsignedLong '{}' for column '{}': {}",
375                    s, column_name, e
376                ),
377            })?;
378            Ok(Value::UnsignedLong(v))
379        }
380        DataType::Duration => {
381            let nanos = parse_duration(s).map_err(|_| Error::Parse {
382                message: format!("Invalid duration '{}' for column '{}'", s, column_name),
383            })?;
384            Ok(Value::Duration(chrono::Duration::nanoseconds(nanos)))
385        }
386        DataType::Base64Binary => {
387            let bytes = base64::engine::general_purpose::STANDARD
388                .decode(s)
389                .map_err(|e| Error::Parse {
390                    message: format!("Invalid base64 '{}' for column '{}': {}", s, column_name, e),
391                })?;
392            Ok(Value::Base64Binary(bytes))
393        }
394        DataType::TimeRFC => {
395            let t = DateTime::parse_from_rfc3339(s).map_err(|e| Error::Parse {
396                message: format!(
397                    "Invalid RFC3339 timestamp '{}' for column '{}': {}",
398                    s, column_name, e
399                ),
400            })?;
401            Ok(Value::TimeRFC(t))
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use chrono::{Datelike, Timelike};
410    use std::io::Cursor;
411
412    // =========================================================================
413    // parse_value tests - Basic types
414    // =========================================================================
415
416    #[test]
417    fn test_parse_value_string() {
418        let v = parse_value("hello", DataType::String, "test").unwrap();
419        assert_eq!(v, Value::String("hello".to_string()));
420    }
421
422    #[test]
423    fn test_parse_value_string_empty() {
424        // Empty string should remain as empty string, not null
425        let v = parse_value("", DataType::String, "test").unwrap();
426        assert_eq!(v, Value::String("".to_string()));
427    }
428
429    #[test]
430    fn test_parse_value_double() {
431        let v = parse_value("2.72", DataType::Double, "test").unwrap();
432        assert_eq!(v, Value::Double(OrderedFloat::from(2.72)));
433    }
434
435    #[test]
436    fn test_parse_value_double_negative() {
437        let v = parse_value("-123.456", DataType::Double, "test").unwrap();
438        assert_eq!(v, Value::Double(OrderedFloat::from(-123.456)));
439    }
440
441    #[test]
442    fn test_parse_value_double_scientific() {
443        let v = parse_value("1.5e10", DataType::Double, "test").unwrap();
444        assert_eq!(v, Value::Double(OrderedFloat::from(1.5e10)));
445    }
446
447    #[test]
448    fn test_parse_value_bool() {
449        assert_eq!(
450            parse_value("true", DataType::Bool, "test").unwrap(),
451            Value::Bool(true)
452        );
453        assert_eq!(
454            parse_value("false", DataType::Bool, "test").unwrap(),
455            Value::Bool(false)
456        );
457    }
458
459    #[test]
460    fn test_parse_value_bool_case_insensitive() {
461        assert_eq!(
462            parse_value("TRUE", DataType::Bool, "test").unwrap(),
463            Value::Bool(true)
464        );
465        assert_eq!(
466            parse_value("FALSE", DataType::Bool, "test").unwrap(),
467            Value::Bool(false)
468        );
469        assert_eq!(
470            parse_value("False", DataType::Bool, "test").unwrap(),
471            Value::Bool(false)
472        );
473    }
474
475    #[test]
476    fn test_parse_value_long() {
477        let v = parse_value("-42", DataType::Long, "test").unwrap();
478        assert_eq!(v, Value::Long(-42));
479    }
480
481    #[test]
482    fn test_parse_value_long_max() {
483        let v = parse_value("9223372036854775807", DataType::Long, "test").unwrap();
484        assert_eq!(v, Value::Long(i64::MAX));
485    }
486
487    #[test]
488    fn test_parse_value_long_min() {
489        let v = parse_value("-9223372036854775808", DataType::Long, "test").unwrap();
490        assert_eq!(v, Value::Long(i64::MIN));
491    }
492
493    #[test]
494    fn test_parse_value_unsigned_long() {
495        let v = parse_value("42", DataType::UnsignedLong, "test").unwrap();
496        assert_eq!(v, Value::UnsignedLong(42));
497    }
498
499    #[test]
500    fn test_parse_value_unsigned_long_max() {
501        let v = parse_value("18446744073709551615", DataType::UnsignedLong, "test").unwrap();
502        assert_eq!(v, Value::UnsignedLong(u64::MAX));
503    }
504
505    #[test]
506    fn test_parse_value_duration() {
507        let v = parse_value("1h30m", DataType::Duration, "test").unwrap();
508        let expected = chrono::Duration::nanoseconds(5_400_000_000_000); // 1.5 hours in nanos
509        assert_eq!(v, Value::Duration(expected));
510    }
511
512    #[test]
513    fn test_parse_value_duration_nanoseconds() {
514        let v = parse_value("100ns", DataType::Duration, "test").unwrap();
515        let expected = chrono::Duration::nanoseconds(100);
516        assert_eq!(v, Value::Duration(expected));
517    }
518
519    #[test]
520    fn test_parse_value_duration_complex() {
521        let v = parse_value("2h45m30s", DataType::Duration, "test").unwrap();
522        // 2*3600 + 45*60 + 30 = 9930 seconds = 9_930_000_000_000 ns
523        let expected = chrono::Duration::nanoseconds(9_930_000_000_000);
524        assert_eq!(v, Value::Duration(expected));
525    }
526
527    #[test]
528    fn test_parse_value_base64() {
529        let v = parse_value("SGVsbG8gV29ybGQ=", DataType::Base64Binary, "test").unwrap();
530        assert_eq!(v, Value::Base64Binary(b"Hello World".to_vec()));
531    }
532
533    #[test]
534    fn test_parse_value_base64_empty() {
535        let v = parse_value("", DataType::Base64Binary, "test").unwrap();
536        assert_eq!(v, Value::Null);
537    }
538
539    #[test]
540    fn test_parse_value_time_rfc3339() {
541        let v = parse_value("2023-11-14T12:30:45Z", DataType::TimeRFC, "test").unwrap();
542        if let Value::TimeRFC(dt) = v {
543            assert_eq!(dt.year(), 2023);
544            assert_eq!(dt.month(), 11);
545            assert_eq!(dt.day(), 14);
546            assert_eq!(dt.hour(), 12);
547            assert_eq!(dt.minute(), 30);
548            assert_eq!(dt.second(), 45);
549        } else {
550            panic!("Expected TimeRFC value");
551        }
552    }
553
554    #[test]
555    fn test_parse_value_time_rfc3339_with_timezone() {
556        let v = parse_value("2023-11-14T12:30:45+09:00", DataType::TimeRFC, "test").unwrap();
557        if let Value::TimeRFC(dt) = v {
558            assert_eq!(dt.year(), 2023);
559            assert_eq!(dt.offset().local_minus_utc(), 9 * 3600);
560        } else {
561            panic!("Expected TimeRFC value");
562        }
563    }
564
565    #[test]
566    fn test_parse_value_time_rfc3339_nano() {
567        let v = parse_value("2023-11-14T12:30:45.123456789Z", DataType::TimeRFC, "test").unwrap();
568        if let Value::TimeRFC(dt) = v {
569            assert_eq!(dt.nanosecond(), 123456789);
570        } else {
571            panic!("Expected TimeRFC value");
572        }
573    }
574
575    #[test]
576    fn test_parse_value_empty_is_null() {
577        let v = parse_value("", DataType::Long, "test").unwrap();
578        assert_eq!(v, Value::Null);
579    }
580
581    #[test]
582    fn test_parse_value_empty_is_null_for_all_non_string_types() {
583        assert_eq!(
584            parse_value("", DataType::Double, "test").unwrap(),
585            Value::Null
586        );
587        assert_eq!(
588            parse_value("", DataType::Long, "test").unwrap(),
589            Value::Null
590        );
591        assert_eq!(
592            parse_value("", DataType::UnsignedLong, "test").unwrap(),
593            Value::Null
594        );
595        assert_eq!(
596            parse_value("", DataType::Bool, "test").unwrap(),
597            Value::Null
598        );
599        assert_eq!(
600            parse_value("", DataType::Duration, "test").unwrap(),
601            Value::Null
602        );
603        assert_eq!(
604            parse_value("", DataType::Base64Binary, "test").unwrap(),
605            Value::Null
606        );
607        assert_eq!(
608            parse_value("", DataType::TimeRFC, "test").unwrap(),
609            Value::Null
610        );
611    }
612
613    // =========================================================================
614    // parse_value tests - Error cases
615    // =========================================================================
616
617    #[test]
618    fn test_parse_value_invalid_double() {
619        let result = parse_value("not_a_number", DataType::Double, "test");
620        assert!(result.is_err());
621        let err = result.unwrap_err();
622        assert!(matches!(err, Error::Parse { .. }));
623    }
624
625    #[test]
626    fn test_parse_value_invalid_long() {
627        let result = parse_value("12.5", DataType::Long, "test");
628        assert!(result.is_err());
629    }
630
631    #[test]
632    fn test_parse_value_invalid_long_overflow() {
633        let result = parse_value("9999999999999999999999", DataType::Long, "test");
634        assert!(result.is_err());
635    }
636
637    #[test]
638    fn test_parse_value_invalid_unsigned_long_negative() {
639        let result = parse_value("-1", DataType::UnsignedLong, "test");
640        assert!(result.is_err());
641    }
642
643    #[test]
644    fn test_parse_value_invalid_duration() {
645        let result = parse_value("not_a_duration", DataType::Duration, "test");
646        assert!(result.is_err());
647    }
648
649    #[test]
650    fn test_parse_value_invalid_base64() {
651        let result = parse_value("!!invalid!!", DataType::Base64Binary, "test");
652        assert!(result.is_err());
653    }
654
655    #[test]
656    fn test_parse_value_invalid_time() {
657        let result = parse_value("not-a-timestamp", DataType::TimeRFC, "test");
658        assert!(result.is_err());
659    }
660
661    #[test]
662    fn test_parse_value_invalid_time_format() {
663        // Valid date but wrong format
664        let result = parse_value("2023/11/14 12:30:45", DataType::TimeRFC, "test");
665        assert!(result.is_err());
666    }
667
668    // =========================================================================
669    // AnnotatedCsvParser tests - Full flow
670    // =========================================================================
671
672    /// Helper to create a parser from a string
673    fn parser_from_str(s: &str) -> AnnotatedCsvParser<Cursor<Vec<u8>>> {
674        AnnotatedCsvParser::new(Cursor::new(s.as_bytes().to_vec()))
675    }
676
677    #[tokio::test]
678    async fn test_parser_basic_csv() {
679        let csv = r#"#datatype,string,long,double
680#group,false,false,false
681#default,,0,0.0
682,name,count,value
683,alice,10,1.5
684,bob,20,2.5
685"#;
686        let mut parser = parser_from_str(csv);
687
688        let record1 = parser.next().await.unwrap().unwrap();
689        assert_eq!(record1.get_string("name"), Some("alice".to_string()));
690        assert_eq!(record1.get_long("count"), Some(10));
691        assert_eq!(record1.get_double("value"), Some(1.5));
692
693        let record2 = parser.next().await.unwrap().unwrap();
694        assert_eq!(record2.get_string("name"), Some("bob".to_string()));
695        assert_eq!(record2.get_long("count"), Some(20));
696        assert_eq!(record2.get_double("value"), Some(2.5));
697
698        // EOF
699        assert!(parser.next().await.unwrap().is_none());
700    }
701
702    #[tokio::test]
703    async fn test_parser_empty_input() {
704        let csv = "";
705        let mut parser = parser_from_str(csv);
706
707        // Should return None (EOF) immediately
708        assert!(parser.next().await.unwrap().is_none());
709    }
710
711    #[tokio::test]
712    async fn test_parser_empty_result_set() {
713        // Only annotations, no data rows
714        let csv = r#"#datatype,string,long
715#group,false,false
716#default,,
717,name,value
718"#;
719        let mut parser = parser_from_str(csv);
720
721        // No data rows, should return None
722        assert!(parser.next().await.unwrap().is_none());
723    }
724
725    #[tokio::test]
726    async fn test_parser_missing_datatype_annotation() {
727        let csv = r#"#group,false,false
728#default,,
729,name,value
730,alice,10
731"#;
732        let mut parser = parser_from_str(csv);
733
734        let result = parser.next().await;
735        assert!(result.is_err());
736        assert!(matches!(result.unwrap_err(), Error::MissingAnnotation(_)));
737    }
738
739    #[tokio::test]
740    async fn test_parser_column_count_mismatch() {
741        let csv = r#"#datatype,string,long
742#group,false,false
743#default,,
744,name,value
745,alice,10,extra_column
746"#;
747        let mut parser = parser_from_str(csv);
748
749        let result = parser.next().await;
750        assert!(result.is_err());
751        assert!(matches!(result.unwrap_err(), Error::ColumnMismatch { .. }));
752    }
753
754    #[tokio::test]
755    async fn test_parser_influxdb_error_response() {
756        let csv = r#"#datatype,string,string
757#group,true,true
758#default,,
759,error,reference
760,bucket not found,some-reference-id
761"#;
762        let mut parser = parser_from_str(csv);
763
764        let result = parser.next().await;
765        assert!(result.is_err());
766        if let Error::QueryError { message, reference } = result.unwrap_err() {
767            assert_eq!(message, "bucket not found");
768            assert_eq!(reference, Some("some-reference-id".to_string()));
769        } else {
770            panic!("Expected QueryError");
771        }
772    }
773
774    #[tokio::test]
775    async fn test_parser_influxdb_error_response_no_reference() {
776        let csv = r#"#datatype,string,string
777#group,true,true
778#default,,
779,error,reference
780,query syntax error,
781"#;
782        let mut parser = parser_from_str(csv);
783
784        let result = parser.next().await;
785        assert!(result.is_err());
786        if let Error::QueryError { message, reference } = result.unwrap_err() {
787            assert_eq!(message, "query syntax error");
788            assert!(reference.is_none());
789        } else {
790            panic!("Expected QueryError");
791        }
792    }
793
794    #[tokio::test]
795    async fn test_parser_multiple_tables() {
796        let csv = r#"#datatype,string,long
797#group,false,false
798#default,,
799,name,value
800,alice,10
801
802#datatype,string,double
803#group,false,false
804#default,,
805,name,score
806,bob,95.5
807"#;
808        let mut parser = parser_from_str(csv);
809
810        let record1 = parser.next().await.unwrap().unwrap();
811        assert_eq!(record1.table, 0);
812        assert_eq!(record1.get_string("name"), Some("alice".to_string()));
813        assert_eq!(record1.get_long("value"), Some(10));
814
815        let record2 = parser.next().await.unwrap().unwrap();
816        assert_eq!(record2.table, 1);
817        assert_eq!(record2.get_string("name"), Some("bob".to_string()));
818        assert_eq!(record2.get_double("score"), Some(95.5));
819
820        assert!(parser.next().await.unwrap().is_none());
821    }
822
823    #[tokio::test]
824    async fn test_parser_default_values() {
825        let csv = r#"#datatype,string,long,double
826#group,false,false,false
827#default,unknown,0,1.0
828,name,count,value
829,alice,,
830"#;
831        let mut parser = parser_from_str(csv);
832
833        let record = parser.next().await.unwrap().unwrap();
834        assert_eq!(record.get_string("name"), Some("alice".to_string()));
835        // Empty values should use defaults
836        assert_eq!(record.get_long("count"), Some(0));
837        assert_eq!(record.get_double("value"), Some(1.0));
838    }
839
840    #[tokio::test]
841    async fn test_parser_group_annotation() {
842        let csv = r#"#datatype,string,string,long
843#group,true,false,false
844#default,,,
845,_measurement,host,value
846,cpu,server1,100
847"#;
848        let mut parser = parser_from_str(csv);
849
850        let record = parser.next().await.unwrap().unwrap();
851        assert_eq!(record.get_string("_measurement"), Some("cpu".to_string()));
852        assert_eq!(record.get_string("host"), Some("server1".to_string()));
853        assert_eq!(record.get_long("value"), Some(100));
854    }
855
856    #[tokio::test]
857    async fn test_parser_all_data_types() {
858        let csv = r#"#datatype,string,long,unsignedLong,double,boolean,dateTime:RFC3339
859#group,false,false,false,false,false,false
860#default,,,,,,
861,str,lng,ulng,dbl,bl,ts
862,hello,-42,18446744073709551615,2.72,true,2023-11-14T12:00:00Z
863"#;
864        let mut parser = parser_from_str(csv);
865
866        let record = parser.next().await.unwrap().unwrap();
867        assert_eq!(record.get_string("str"), Some("hello".to_string()));
868        assert_eq!(record.get_long("lng"), Some(-42));
869        assert_eq!(
870            record.values.get("ulng").and_then(|v| v.as_unsigned_long()),
871            Some(u64::MAX)
872        );
873        assert_eq!(record.get_double("dbl"), Some(2.72));
874        assert_eq!(record.get_bool("bl"), Some(true));
875        assert!(record.values.get("ts").and_then(|v| v.as_time()).is_some());
876    }
877
878    #[tokio::test]
879    async fn test_parser_skips_empty_rows() {
880        let csv = r#"#datatype,string,long
881#group,false,false
882#default,,
883,name,value
884
885,alice,10
886
887,bob,20
888
889"#;
890        let mut parser = parser_from_str(csv);
891
892        let record1 = parser.next().await.unwrap().unwrap();
893        assert_eq!(record1.get_string("name"), Some("alice".to_string()));
894
895        let record2 = parser.next().await.unwrap().unwrap();
896        assert_eq!(record2.get_string("name"), Some("bob".to_string()));
897
898        assert!(parser.next().await.unwrap().is_none());
899    }
900
901    #[tokio::test]
902    async fn test_parser_invalid_first_cell() {
903        let csv = r#"#datatype,string,long
904#group,false,false
905#default,,
906,name,value
907invalid,alice,10
908"#;
909        let mut parser = parser_from_str(csv);
910
911        let result = parser.next().await;
912        assert!(result.is_err());
913        assert!(matches!(result.unwrap_err(), Error::Parse { .. }));
914    }
915
916    #[tokio::test]
917    async fn test_parser_unknown_datatype() {
918        let csv = r#"#datatype,string,unknown_type
919#group,false,false
920#default,,
921,name,value
922,alice,10
923"#;
924        let mut parser = parser_from_str(csv);
925
926        let result = parser.next().await;
927        assert!(result.is_err());
928        assert!(matches!(result.unwrap_err(), Error::UnknownDataType(_)));
929    }
930}