use std::{
collections::{BTreeMap, HashMap},
fmt::Display,
};
use clap::{Parser, Subcommand};
use flexi_logger::Logger;
use glowmarkt::{
align_to_period, split_periods, Device, Error, ErrorKind, GlowmarktApi, ReadingPeriod, Resource,
};
use influx::Measurement;
use serde::Serialize;
use serde_json::to_string_pretty;
use time::{format_description::well_known::Iso8601, Duration, OffsetDateTime};
use crate::influx::{add_tags_for_device, add_tags_for_resource, field_for_classifier};
mod influx;
#[derive(Parser)]
#[clap(author, version)]
struct Args {
#[clap(short, long, env)]
pub username: Option<String>,
#[clap(short, long, env)]
pub password: Option<String>,
#[clap(short, long, env)]
pub token: Option<String>,
#[clap(subcommand)]
command: Command,
}
fn parse_tag(val: &str) -> Result<(String, String), String> {
if let Some(pos) = val.find('=') {
Ok((val[0..pos].to_string(), val[pos + 1..].to_string()))
} else {
Err(format!(
"Unable to parse tag '{}', no equals sign present.",
val
))
}
}
#[derive(Subcommand)]
enum Command {
Token,
Device {
id: Option<String>,
},
DeviceType {
id: Option<String>,
},
ResourceType {
id: Option<String>,
},
Resource {
id: Option<String>,
},
Readings {
resource_id: String,
from: String,
to: Option<String>,
},
Tariff {
resource_id: String,
},
TariffList {
resource_id: String,
},
Influx {
#[clap(short, long, env)]
device: Option<String>,
#[clap(short, long, env)]
no_strip: bool,
#[clap(short, long = "tag", value_parser=parse_tag)]
tags: Vec<(String, String)>,
from: String,
to: Option<String>,
},
}
fn parse_date(date: String, period: ReadingPeriod) -> Result<OffsetDateTime, String> {
if let Some(date) = date.strip_prefix('-') {
let offset = date.parse::<i64>().str_err()?;
Ok(align_to_period(
OffsetDateTime::now_utc() - Duration::minutes(offset),
period,
))
} else {
OffsetDateTime::parse(&date, &Iso8601::DEFAULT)
.map_err(|_| {
format!("Couldn't format the date '{date}' as ISO-8601, try '2023-01-01T00:00:00Z'")
})
.and_then(|date| {
let now = OffsetDateTime::now_utc();
if date > now {
Err("Cannot use a date that is in the future.".to_string())
} else {
Ok(align_to_period(date, period))
}
})
}
}
fn parse_end_date(date: Option<String>, period: ReadingPeriod) -> Result<OffsetDateTime, String> {
if let Some(date) = date {
if let Some(date) = date.strip_prefix('-') {
let offset = date.parse::<i64>().str_err()?;
Ok(align_to_period(
OffsetDateTime::now_utc() - Duration::minutes(offset),
period,
))
} else {
OffsetDateTime::parse(&date, &Iso8601::DEFAULT)
.map_err(|_| {
format!(
"Couldn't format the date '{date}' as ISO-8601, try '2023-01-01T00:00:00Z'"
)
})
.and_then(|date| {
let now = OffsetDateTime::now_utc();
if date > now {
Err("Cannot use a date that is in the future.".to_string())
} else {
Ok(align_to_period(date, period))
}
})
}
} else {
Ok(align_to_period(OffsetDateTime::now_utc(), period))
}
}
trait ErrorStr<V> {
fn str_err(self) -> Result<V, String>;
}
impl<V, D: Display> ErrorStr<V> for Result<V, D> {
fn str_err(self) -> Result<V, String> {
self.map_err(|e| e.to_string())
}
}
fn values<T>(map: HashMap<String, T>) -> Vec<T> {
map.into_values().collect()
}
fn display_result<T: Serialize>(
items: Result<HashMap<String, T>, Error>,
id: Option<String>,
) -> Result<(), String> {
let items = items.str_err()?;
if let Some(id) = id {
println!("{}", to_string_pretty(&items.get(&id)).str_err()?);
} else {
println!("{}", to_string_pretty(&values(items)).str_err()?);
}
Ok(())
}
async fn readings(
api: GlowmarktApi,
resource: String,
start: String,
end: Option<String>,
) -> Result<(), String> {
let period = ReadingPeriod::HalfHour;
let start = parse_date(start, period)?;
let end = parse_end_date(end, period)?;
let ranges = split_periods(start, end, period);
for (start, end) in ranges {
let readings = api
.readings(&resource, &start, &end, period)
.await
.str_err()?;
println!("{}", to_string_pretty(&readings).str_err()?);
}
Ok(())
}
async fn latest_tariff(api: GlowmarktApi, resource: String) -> Result<(), String> {
let tariff = api.latest_tariff(&resource).await.str_err()?;
println!("{}", to_string_pretty(&tariff).str_err()?);
Ok(())
}
async fn tariff_list(api: GlowmarktApi, resource: String) -> Result<(), String> {
let tariff = api.tariff_list(&resource).await.str_err()?;
println!("{}", to_string_pretty(&tariff).str_err()?);
Ok(())
}
async fn influx(
api: GlowmarktApi,
device: Option<String>,
no_strip: bool,
tags: BTreeMap<String, String>,
start: String,
end: Option<String>,
) -> Result<(), String> {
let period = ReadingPeriod::HalfHour;
let start = parse_date(start, period)?;
let end = parse_end_date(end, period)?;
let ranges = split_periods(start, end, period);
let mut measurements = BTreeMap::new();
let resources = api.resources().await?;
async fn process_device(
api: &GlowmarktApi,
tags: &BTreeMap<String, String>,
resources: &HashMap<String, Resource>,
device: Device,
ranges: &Vec<(OffsetDateTime, OffsetDateTime)>,
measurements: &mut BTreeMap<OffsetDateTime, Vec<Measurement>>,
) -> Result<(), Error> {
let mut tags = tags.clone();
add_tags_for_device(&mut tags, &device);
for sensor in device.protocol.sensors {
if let Some(resource) = resources.get(&sensor.resource_id) {
let mut tags = tags.clone();
add_tags_for_resource(&mut tags, resource);
for (start, end) in ranges {
let readings = match api
.readings(&resource.id, start, end, ReadingPeriod::HalfHour)
.await
{
Ok(r) => r,
Err(_) => return Ok(()),
};
for reading in readings {
let mut measurement =
Measurement::new("glowmarkt", reading.start, tags.clone());
measurement.add_field(
field_for_classifier(&resource.classifier),
reading.value as f64,
);
measurements
.entry(reading.start)
.or_default()
.push(measurement);
}
}
}
}
Ok(())
}
if let Some(device) = device {
if let Some(device) = api.device(&device).await? {
process_device(&api, &tags, &resources, device, &ranges, &mut measurements).await?;
} else {
eprintln!("Error: Unknown device {}", device);
}
} else {
let devices = api.devices().await?.into_values();
for device in devices {
process_device(&api, &tags, &resources, device, &ranges, &mut measurements).await?;
}
}
if !no_strip {
let timestamps: Vec<OffsetDateTime> = measurements.keys().rev().cloned().collect();
for timestamp in timestamps {
if measurements
.get(×tamp)
.unwrap()
.iter()
.all(|m| m.fields.iter().all(|(_, v)| *v == 0.0))
{
measurements.remove(×tamp);
}
}
}
for (_, measurements) in measurements {
for measurement in measurements {
println!("{}", measurement);
}
}
Ok(())
}
async fn login(args: &Args) -> Result<GlowmarktApi, String> {
if let Some(ref token) = args.token {
let api = GlowmarktApi::new(token);
match api.validate().await {
Ok(_) => {
return Ok(api);
}
Err(e) => {
if e.kind != ErrorKind::NotAuthenticated {
return Err(e.to_string());
}
}
}
}
if let (Some(username), Some(password)) = (&args.username, &args.password) {
GlowmarktApi::authenticate(username, password)
.await
.str_err()
} else {
Err("Must pass username and password.".to_string())
}
}
#[tokio::main]
async fn main() -> Result<(), String> {
if let Err(e) = Logger::try_with_env_or_str("info").and_then(|logger| logger.start()) {
eprintln!("Warning, failed to start logging: {}", e);
}
let args = Args::parse();
let api = login(&args).await?;
match args.command {
Command::Token => {
println!("{}", api.token);
Ok(())
}
Command::Device { id } => display_result(api.devices().await, id),
Command::DeviceType { id } => display_result(api.device_types().await, id),
Command::ResourceType { id } => display_result(api.resource_types().await, id),
Command::Resource { id } => display_result(api.resources().await, id),
Command::Readings {
resource_id,
from,
to,
} => readings(api, resource_id, from, to).await,
Command::Tariff { resource_id } => latest_tariff(api, resource_id).await,
Command::TariffList { resource_id } => tariff_list(api, resource_id).await,
Command::Influx {
device,
no_strip,
tags,
from,
to,
} => influx(api, device, no_strip, tags.into_iter().collect(), from, to).await,
}
}