1use async_std::task;
18use async_trait::async_trait;
19use futures::channel::mpsc::UnboundedReceiver;
20use futures::stream::{self, StreamExt};
21use influxdb2::Client;
22use influxdb2::models::DataPoint;
23use influxdb2::models::health::Status;
24use log::{info, warn};
25use serde::Deserialize;
26use std::iter::zip;
27use std::sync::Arc;
28use std::time::Duration;
29
30use super::receiver::{Receiver, Update};
31
32pub struct Influxdb2Receiver {
33 client: Client,
34 bucket: String,
35}
36
37impl Influxdb2Receiver {
38 pub async fn new(config: &Config) -> Self {
39 let client = Client::new(&config.host, &config.org, &config.token);
40 match client.health().await {
41 Ok(health_check) => {
42 if health_check.status == Status::Fail {
43 match health_check.message {
44 Some(ref message) => {
45 warn!("Influxdb server is unhealthy: {message}");
46 }
47 None => {
48 warn!("Influxdb server is unhealthy");
49 }
50 }
51 } else {
52 info!(
53 "Successfully connected to Influxdb server at {}",
54 &config.host
55 );
56 }
57 }
58 Err(err) => {
59 warn!("Could not connect to Influxdb server: {err}");
60 }
61 }
62 Self {
63 client,
64 bucket: config.bucket.to_owned(),
65 }
66 }
67}
68
69#[async_trait]
70impl Receiver for Influxdb2Receiver {
71 async fn run<'a>(&mut self, mut receiver: UnboundedReceiver<Arc<Update<'a>>>) {
72 while let Some(update) = receiver.next().await {
73 let mut points = vec![];
74 for (field, value) in zip(update.fields.iter(), update.values.iter()) {
75 let build = DataPoint::builder("inverter")
76 .timestamp(update.timestamp)
77 .tag("serial", update.serial.as_str())
78 .tag("group", field.group)
79 .tag("name", field.name);
80 let build = if field.unit.is_empty() {
81 build
82 } else {
83 build.tag("unit", field.unit)
84 };
85 let build = build.field("value", *value).build();
86 match build {
87 Ok(value) => {
88 points.push(value);
89 }
90 Err(err) => {
91 warn!("Error building point: {err:?}");
92 }
93 }
94 }
95 if !points.is_empty() {
96 loop {
97 match self
98 .client
99 .write(self.bucket.as_str(), stream::iter(points.clone()))
100 .await
101 {
102 Ok(_) => {
103 break;
104 }
105 Err(err) => {
106 info!("Error writing to Influxdb; trying again in 5s ({err:?})");
107 task::sleep(Duration::from_secs(5)).await;
108 }
109 }
110 }
111 }
112 }
113 }
114}
115
116#[derive(Deserialize)]
117#[serde(deny_unknown_fields)]
118pub struct Config {
119 #[serde(default = "default_host")]
120 pub host: String,
121 pub org: String,
122 pub token: String,
123 pub bucket: String,
124}
125
126fn default_host() -> String {
127 "http://localhost:8086".to_string()
128}