rinfluxdb_dataframe/
lib.rs

1// Copyright Claudio Mattera 2021.
2// Distributed under the MIT License or Apache 2.0 License at your option.
3// See accompanying files License-MIT.txt and License-Apache-2.0, or online at
4// https://opensource.org/licenses/MIT
5// https://opensource.org/licenses/Apache-2.0
6
7//! Dummy dataframe implementation
8
9use std::collections::HashMap;
10use std::convert::TryFrom;
11use std::fmt;
12
13use chrono::{DateTime, Utc};
14
15use rinfluxdb_types::{DataFrameError, Value};
16
17/// Column type
18#[derive(Clone, Debug, PartialEq)]
19pub enum Column {
20    /// A column of floating point values
21    Float(Vec<f64>),
22
23    /// A column of integer values
24    Integer(Vec<i64>),
25
26    /// A column of unsigned integer values
27    UnsignedInteger(Vec<u64>),
28
29    /// A column of string values
30    String(Vec<String>),
31
32    /// A column of boolean values
33    Boolean(Vec<bool>),
34
35    /// A column of datetime values
36    Timestamp(Vec<DateTime<Utc>>),
37}
38
39impl Column {
40    fn display_index(&self, index: usize, f: &mut fmt::Formatter) -> fmt::Result {
41        match self {
42            Column::Float(values) => write!(f, "{:16}  ", values[index])?,
43            Column::Integer(values) => write!(f, "{:16}  ", values[index])?,
44            Column::UnsignedInteger(values) => write!(f, "{:16}  ", values[index])?,
45            Column::String(values) => write!(f, "{:16}  ", values[index])?,
46            Column::Boolean(values) => write!(f, "{:16}  ", values[index])?,
47            Column::Timestamp(values) => write!(f, "{:16}  ", values[index])?,
48        }
49
50        Ok(())
51    }
52}
53
54/// A time-indexed dataframe
55///
56/// A dataframe contains multiple named columns indexed by the same index.
57#[derive(Clone, Debug)]
58pub struct DataFrame {
59    name: String,
60    index: Vec<DateTime<Utc>>,
61    columns: HashMap<String, Column>,
62}
63
64impl fmt::Display for DataFrame {
65    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66        write!(f, "{:>23}  ", "datetime")?;
67        for column in self.columns.keys() {
68            write!(f, "{:>16}  ", column)?;
69        }
70        write!(f, "\n-----------------------  ")?;
71        for _column in self.columns.keys() {
72            write!(f, "----------------  ")?;
73        }
74        writeln!(f)?;
75
76        for (i, index) in self.index.iter().enumerate() {
77            write!(f, "{:>23}  ", index)?;
78            for column in self.columns.values() {
79                column.display_index(i, f)?;
80            }
81            writeln!(f)?;
82        }
83
84        Ok(())
85    }
86}
87
88impl TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>)> for DataFrame {
89    type Error = DataFrameError;
90
91    fn try_from(
92        (name, index, columns): (String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>),
93    ) -> Result<Self, Self::Error> {
94        let columns: HashMap<String, Result<Column, Self::Error>> = columns
95            .into_iter()
96            .map(|(name, column)| {
97                let column = match column.first() {
98                    Some(Value::Float(_)) => Ok(Column::Float(
99                        column
100                            .into_iter()
101                            .map(|element| element.into_f64())
102                            .collect(),
103                    )),
104                    Some(Value::Integer(_)) => Ok(Column::Integer(
105                        column
106                            .into_iter()
107                            .map(|element| element.into_i64())
108                            .collect(),
109                    )),
110                    Some(Value::UnsignedInteger(_)) => Ok(Column::UnsignedInteger(
111                        column
112                            .into_iter()
113                            .map(|element| element.into_u64())
114                            .collect(),
115                    )),
116                    Some(Value::String(_)) => Ok(Column::String(
117                        column
118                            .into_iter()
119                            .map(|element| element.into_string())
120                            .collect(),
121                    )),
122                    Some(Value::Boolean(_)) => Ok(Column::Boolean(
123                        column
124                            .into_iter()
125                            .map(|element| element.into_boolean())
126                            .collect(),
127                    )),
128                    Some(Value::Timestamp(_)) => Ok(Column::Timestamp(
129                        column
130                            .into_iter()
131                            .map(|element| element.into_timestamp())
132                            .collect(),
133                    )),
134                    None => Err(DataFrameError::Creation),
135                };
136                (name, column)
137            })
138            .collect();
139
140        let columns = flatten_map(columns)?;
141
142        Ok(Self {
143            name,
144            index,
145            columns,
146        })
147    }
148}
149
150fn flatten_map<K, V, E>(map: HashMap<K, Result<V, E>>) -> Result<HashMap<K, V>, E>
151where
152    K: Eq + std::hash::Hash,
153    E: std::error::Error,
154{
155    map.into_iter()
156        .try_fold(HashMap::new(), |mut accumulator, (name, column)| {
157            let column = column?;
158            accumulator.insert(name, column);
159            Ok(accumulator)
160        })
161}