influxdb_stream/
types.rs

1//! Core types for InfluxDB Flux query results.
2
3use std::collections::BTreeMap;
4use std::str::FromStr;
5
6use crate::error::Error;
7use crate::value::Value;
8
9/// Data types supported in InfluxDB annotated CSV.
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11pub enum DataType {
12    /// String data type.
13    String,
14    /// 64-bit floating point.
15    Double,
16    /// Boolean value.
17    Bool,
18    /// Signed 64-bit integer.
19    Long,
20    /// Unsigned 64-bit integer.
21    UnsignedLong,
22    /// Duration (Go-style, e.g., "1h30m").
23    Duration,
24    /// Base64-encoded binary data.
25    Base64Binary,
26    /// RFC3339 timestamp (with optional nanosecond precision).
27    TimeRFC,
28}
29
30impl FromStr for DataType {
31    type Err = Error;
32
33    fn from_str(input: &str) -> Result<Self, Self::Err> {
34        match input {
35            "string" => Ok(Self::String),
36            "double" => Ok(Self::Double),
37            "boolean" => Ok(Self::Bool),
38            "long" => Ok(Self::Long),
39            "unsignedLong" => Ok(Self::UnsignedLong),
40            "duration" => Ok(Self::Duration),
41            "base64Binary" => Ok(Self::Base64Binary),
42            "dateTime:RFC3339" | "dateTime:RFC3339Nano" => Ok(Self::TimeRFC),
43            _ => Err(Error::UnknownDataType(input.to_string())),
44        }
45    }
46}
47
48impl std::fmt::Display for DataType {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        let s = match self {
51            DataType::String => "string",
52            DataType::Double => "double",
53            DataType::Bool => "boolean",
54            DataType::Long => "long",
55            DataType::UnsignedLong => "unsignedLong",
56            DataType::Duration => "duration",
57            DataType::Base64Binary => "base64Binary",
58            DataType::TimeRFC => "dateTime:RFC3339",
59        };
60        write!(f, "{}", s)
61    }
62}
63
64/// Metadata for a column in a Flux table.
65#[derive(Clone, Debug)]
66pub struct FluxColumn {
67    /// Column name.
68    pub name: String,
69    /// Data type of the column.
70    pub data_type: DataType,
71    /// Whether this column is part of the group key.
72    pub group: bool,
73    /// Default value for missing entries.
74    pub default_value: String,
75}
76
77impl FluxColumn {
78    /// Create a new FluxColumn with default values.
79    pub fn new() -> Self {
80        Self {
81            name: String::new(),
82            data_type: DataType::String,
83            group: false,
84            default_value: String::new(),
85        }
86    }
87}
88
89impl Default for FluxColumn {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95/// Metadata for a Flux table (one result set from a query).
96#[derive(Clone, Debug)]
97pub struct FluxTableMetadata {
98    /// Table position/index in the query results.
99    pub position: i32,
100    /// Column definitions for this table.
101    pub columns: Vec<FluxColumn>,
102}
103
104impl FluxTableMetadata {
105    /// Create a new FluxTableMetadata with the given position and column count.
106    pub fn new(position: i32, column_count: usize) -> Self {
107        let columns = (0..column_count).map(|_| FluxColumn::new()).collect();
108        Self { position, columns }
109    }
110
111    /// Get a column by name.
112    pub fn column(&self, name: &str) -> Option<&FluxColumn> {
113        self.columns.iter().find(|c| c.name == name)
114    }
115}
116
117/// A single record (row) from a Flux query result.
118#[derive(Clone, Debug)]
119pub struct FluxRecord {
120    /// Table index this record belongs to.
121    pub table: i32,
122    /// Column name to value mapping.
123    pub values: BTreeMap<String, Value>,
124}
125
126impl FluxRecord {
127    /// Create a new empty FluxRecord.
128    pub fn new(table: i32) -> Self {
129        Self {
130            table,
131            values: BTreeMap::new(),
132        }
133    }
134
135    /// Get a value by column name.
136    pub fn get(&self, name: &str) -> Option<&Value> {
137        self.values.get(name)
138    }
139
140    /// Get value as string.
141    pub fn get_string(&self, name: &str) -> Option<String> {
142        self.values.get(name).and_then(|v| v.string())
143    }
144
145    /// Get value as f64.
146    pub fn get_double(&self, name: &str) -> Option<f64> {
147        self.values.get(name).and_then(|v| v.as_double())
148    }
149
150    /// Get value as i64.
151    pub fn get_long(&self, name: &str) -> Option<i64> {
152        self.values.get(name).and_then(|v| v.as_long())
153    }
154
155    /// Get value as bool.
156    pub fn get_bool(&self, name: &str) -> Option<bool> {
157        self.values.get(name).and_then(|v| v.as_bool())
158    }
159
160    /// Get the timestamp (_time field).
161    pub fn time(&self) -> Option<&chrono::DateTime<chrono::FixedOffset>> {
162        self.values.get("_time").and_then(|v| v.as_time())
163    }
164
165    /// Get the measurement name (_measurement field).
166    pub fn measurement(&self) -> Option<String> {
167        self.get_string("_measurement")
168    }
169
170    /// Get the field name (_field).
171    pub fn field(&self) -> Option<String> {
172        self.get_string("_field")
173    }
174
175    /// Get the field value (_value).
176    pub fn value(&self) -> Option<&Value> {
177        self.values.get("_value")
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use chrono::{DateTime, Datelike};
185    use ordered_float::OrderedFloat;
186
187    // =========================================================================
188    // DataType tests
189    // =========================================================================
190
191    #[test]
192    fn test_datatype_from_str() {
193        assert_eq!(DataType::from_str("string").unwrap(), DataType::String);
194        assert_eq!(DataType::from_str("double").unwrap(), DataType::Double);
195        assert_eq!(DataType::from_str("boolean").unwrap(), DataType::Bool);
196        assert_eq!(DataType::from_str("long").unwrap(), DataType::Long);
197        assert_eq!(
198            DataType::from_str("unsignedLong").unwrap(),
199            DataType::UnsignedLong
200        );
201        assert_eq!(DataType::from_str("duration").unwrap(), DataType::Duration);
202        assert_eq!(
203            DataType::from_str("base64Binary").unwrap(),
204            DataType::Base64Binary
205        );
206        assert_eq!(
207            DataType::from_str("dateTime:RFC3339").unwrap(),
208            DataType::TimeRFC
209        );
210        assert_eq!(
211            DataType::from_str("dateTime:RFC3339Nano").unwrap(),
212            DataType::TimeRFC
213        );
214    }
215
216    #[test]
217    fn test_datatype_from_str_unknown() {
218        let result = DataType::from_str("unknown");
219        assert!(result.is_err());
220    }
221
222    #[test]
223    fn test_datatype_display() {
224        assert_eq!(DataType::String.to_string(), "string");
225        assert_eq!(DataType::Double.to_string(), "double");
226        assert_eq!(DataType::Bool.to_string(), "boolean");
227        assert_eq!(DataType::Long.to_string(), "long");
228        assert_eq!(DataType::UnsignedLong.to_string(), "unsignedLong");
229        assert_eq!(DataType::Duration.to_string(), "duration");
230        assert_eq!(DataType::Base64Binary.to_string(), "base64Binary");
231        assert_eq!(DataType::TimeRFC.to_string(), "dateTime:RFC3339");
232    }
233
234    #[test]
235    fn test_datatype_roundtrip() {
236        // Parse and display should be consistent
237        for type_str in [
238            "string",
239            "double",
240            "boolean",
241            "long",
242            "unsignedLong",
243            "duration",
244            "base64Binary",
245            "dateTime:RFC3339",
246        ] {
247            let dt = DataType::from_str(type_str).unwrap();
248            assert_eq!(dt.to_string(), type_str);
249        }
250    }
251
252    // =========================================================================
253    // FluxColumn tests
254    // =========================================================================
255
256    #[test]
257    fn test_flux_column_new() {
258        let col = FluxColumn::new();
259        assert_eq!(col.name, "");
260        assert_eq!(col.data_type, DataType::String);
261        assert!(!col.group);
262        assert_eq!(col.default_value, "");
263    }
264
265    #[test]
266    fn test_flux_column_default() {
267        let col = FluxColumn::default();
268        assert_eq!(col.name, "");
269        assert_eq!(col.data_type, DataType::String);
270        assert!(!col.group);
271        assert_eq!(col.default_value, "");
272    }
273
274    // =========================================================================
275    // FluxTableMetadata tests
276    // =========================================================================
277
278    #[test]
279    fn test_flux_table_metadata_new() {
280        let table = FluxTableMetadata::new(0, 3);
281        assert_eq!(table.position, 0);
282        assert_eq!(table.columns.len(), 3);
283    }
284
285    #[test]
286    fn test_flux_table_metadata_column() {
287        let mut table = FluxTableMetadata::new(0, 2);
288        table.columns[0].name = "col1".to_string();
289        table.columns[1].name = "col2".to_string();
290
291        assert!(table.column("col1").is_some());
292        assert_eq!(table.column("col1").unwrap().name, "col1");
293        assert!(table.column("col2").is_some());
294        assert!(table.column("nonexistent").is_none());
295    }
296
297    // =========================================================================
298    // FluxRecord tests
299    // =========================================================================
300
301    #[test]
302    fn test_flux_record_new() {
303        let record = FluxRecord::new(5);
304        assert_eq!(record.table, 5);
305        assert!(record.values.is_empty());
306    }
307
308    #[test]
309    fn test_flux_record_get() {
310        let mut record = FluxRecord::new(0);
311        record
312            .values
313            .insert("key".to_string(), Value::String("value".to_string()));
314
315        assert!(record.get("key").is_some());
316        assert_eq!(record.get("key"), Some(&Value::String("value".to_string())));
317        assert!(record.get("nonexistent").is_none());
318    }
319
320    #[test]
321    fn test_flux_record_get_string() {
322        let mut record = FluxRecord::new(0);
323        record
324            .values
325            .insert("name".to_string(), Value::String("alice".to_string()));
326        record.values.insert("count".to_string(), Value::Long(42));
327
328        assert_eq!(record.get_string("name"), Some("alice".to_string()));
329        assert_eq!(record.get_string("count"), None); // Not a string
330        assert_eq!(record.get_string("nonexistent"), None);
331    }
332
333    #[test]
334    fn test_flux_record_get_double() {
335        let mut record = FluxRecord::new(0);
336        record
337            .values
338            .insert("value".to_string(), Value::Double(OrderedFloat::from(2.72)));
339        record
340            .values
341            .insert("name".to_string(), Value::String("test".to_string()));
342
343        assert_eq!(record.get_double("value"), Some(2.72));
344        assert_eq!(record.get_double("name"), None); // Not a double
345        assert_eq!(record.get_double("nonexistent"), None);
346    }
347
348    #[test]
349    fn test_flux_record_get_long() {
350        let mut record = FluxRecord::new(0);
351        record.values.insert("count".to_string(), Value::Long(-42));
352        record
353            .values
354            .insert("name".to_string(), Value::String("test".to_string()));
355
356        assert_eq!(record.get_long("count"), Some(-42));
357        assert_eq!(record.get_long("name"), None); // Not a long
358        assert_eq!(record.get_long("nonexistent"), None);
359    }
360
361    #[test]
362    fn test_flux_record_get_bool() {
363        let mut record = FluxRecord::new(0);
364        record.values.insert("flag".to_string(), Value::Bool(true));
365        record
366            .values
367            .insert("name".to_string(), Value::String("test".to_string()));
368
369        assert_eq!(record.get_bool("flag"), Some(true));
370        assert_eq!(record.get_bool("name"), None); // Not a bool
371        assert_eq!(record.get_bool("nonexistent"), None);
372    }
373
374    #[test]
375    fn test_flux_record_time() {
376        let mut record = FluxRecord::new(0);
377        let dt = DateTime::parse_from_rfc3339("2023-11-14T12:00:00Z").unwrap();
378        record
379            .values
380            .insert("_time".to_string(), Value::TimeRFC(dt));
381
382        assert!(record.time().is_some());
383        assert_eq!(record.time().unwrap().year(), 2023);
384    }
385
386    #[test]
387    fn test_flux_record_time_missing() {
388        let record = FluxRecord::new(0);
389        assert!(record.time().is_none());
390    }
391
392    #[test]
393    fn test_flux_record_measurement() {
394        let mut record = FluxRecord::new(0);
395        record
396            .values
397            .insert("_measurement".to_string(), Value::String("cpu".to_string()));
398
399        assert_eq!(record.measurement(), Some("cpu".to_string()));
400    }
401
402    #[test]
403    fn test_flux_record_measurement_missing() {
404        let record = FluxRecord::new(0);
405        assert!(record.measurement().is_none());
406    }
407
408    #[test]
409    fn test_flux_record_field() {
410        let mut record = FluxRecord::new(0);
411        record.values.insert(
412            "_field".to_string(),
413            Value::String("temperature".to_string()),
414        );
415
416        assert_eq!(record.field(), Some("temperature".to_string()));
417    }
418
419    #[test]
420    fn test_flux_record_field_missing() {
421        let record = FluxRecord::new(0);
422        assert!(record.field().is_none());
423    }
424
425    #[test]
426    fn test_flux_record_value() {
427        let mut record = FluxRecord::new(0);
428        record.values.insert(
429            "_value".to_string(),
430            Value::Double(OrderedFloat::from(25.5)),
431        );
432
433        assert!(record.value().is_some());
434        assert_eq!(
435            record.value(),
436            Some(&Value::Double(OrderedFloat::from(25.5)))
437        );
438    }
439
440    #[test]
441    fn test_flux_record_value_missing() {
442        let record = FluxRecord::new(0);
443        assert!(record.value().is_none());
444    }
445}