sunsniff/
influxdb2.rs

1/* Copyright 2022 Bruce Merry
2 *
3 * This program is free software: you can redistribute it and/or modify it
4 * under the terms of the GNU General Public License as published by the Free
5 * Software Foundation, either version 3 of the License, or (at your option)
6 * any later version.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program. If not, see <https://www.gnu.org/licenses/>.
15 */
16
17use async_std::task;
18use async_trait::async_trait;
19use futures::channel::mpsc::UnboundedReceiver;
20use futures::stream::{self, StreamExt};
21use influxdb2::Client;
22use influxdb2::models::DataPoint;
23use influxdb2::models::health::Status;
24use log::{info, warn};
25use serde::Deserialize;
26use std::iter::zip;
27use std::sync::Arc;
28use std::time::Duration;
29
30use super::receiver::{Receiver, Update};
31
32pub struct Influxdb2Receiver {
33    client: Client,
34    bucket: String,
35}
36
37impl Influxdb2Receiver {
38    pub async fn new(config: &Config) -> Self {
39        let client = Client::new(&config.host, &config.org, &config.token);
40        match client.health().await {
41            Ok(health_check) => {
42                if health_check.status == Status::Fail {
43                    match health_check.message {
44                        Some(ref message) => {
45                            warn!("Influxdb server is unhealthy: {message}");
46                        }
47                        None => {
48                            warn!("Influxdb server is unhealthy");
49                        }
50                    }
51                } else {
52                    info!(
53                        "Successfully connected to Influxdb server at {}",
54                        &config.host
55                    );
56                }
57            }
58            Err(err) => {
59                warn!("Could not connect to Influxdb server: {err}");
60            }
61        }
62        Self {
63            client,
64            bucket: config.bucket.to_owned(),
65        }
66    }
67}
68
69#[async_trait]
70impl Receiver for Influxdb2Receiver {
71    async fn run<'a>(&mut self, mut receiver: UnboundedReceiver<Arc<Update<'a>>>) {
72        while let Some(update) = receiver.next().await {
73            let mut points = vec![];
74            for (field, value) in zip(update.fields.iter(), update.values.iter()) {
75                let build = DataPoint::builder("inverter")
76                    .timestamp(update.timestamp)
77                    .tag("serial", update.serial.as_str())
78                    .tag("group", field.group)
79                    .tag("name", field.name);
80                let build = if field.unit.is_empty() {
81                    build
82                } else {
83                    build.tag("unit", field.unit)
84                };
85                let build = build.field("value", *value).build();
86                match build {
87                    Ok(value) => {
88                        points.push(value);
89                    }
90                    Err(err) => {
91                        warn!("Error building point: {err:?}");
92                    }
93                }
94            }
95            if !points.is_empty() {
96                loop {
97                    match self
98                        .client
99                        .write(self.bucket.as_str(), stream::iter(points.clone()))
100                        .await
101                    {
102                        Ok(_) => {
103                            break;
104                        }
105                        Err(err) => {
106                            info!("Error writing to Influxdb; trying again in 5s ({err:?})");
107                            task::sleep(Duration::from_secs(5)).await;
108                        }
109                    }
110                }
111            }
112        }
113    }
114}
115
116#[derive(Deserialize)]
117#[serde(deny_unknown_fields)]
118pub struct Config {
119    #[serde(default = "default_host")]
120    pub host: String,
121    pub org: String,
122    pub token: String,
123    pub bucket: String,
124}
125
126fn default_host() -> String {
127    "http://localhost:8086".to_string()
128}