use tokio::sync::broadcast;
use std::collections::{BTreeSet, HashSet};
use std::time::Duration;
use crate::{
Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle,
client::{
ElectricalComponentCategory,
proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
},
metric,
microgrid::{
battery_bounds_tracker::BatteryPoolBoundsTracker,
telemetry_tracker::battery_pool_telemetry_tracker::{
BatteryPoolSnapshot, BatteryPoolTelemetryTracker,
},
},
quantity::Power,
};
pub struct BatteryPool {
component_ids: Option<BTreeSet<u64>>,
client: MicrogridClientHandle,
logical_meter: LogicalMeterHandle,
snapshot_tx: Option<broadcast::WeakSender<BatteryPoolSnapshot>>,
bounds_tx: Option<broadcast::WeakSender<Vec<Bounds<Power>>>>,
}
impl BatteryPool {
pub(crate) fn try_new(
component_ids: Option<BTreeSet<u64>>,
client: MicrogridClientHandle,
logical_meter: LogicalMeterHandle,
) -> Result<Self, Error> {
let this = Self {
component_ids,
client,
logical_meter,
snapshot_tx: None,
bounds_tx: None,
};
if let Some(ids) = &this.component_ids {
if ids.is_empty() {
let e = "component_ids cannot be an empty set".to_string();
tracing::error!("{e}");
return Err(Error::invalid_component(e));
}
if !ids.is_subset(&this.get_all_battery_ids()) {
let e = format!("All component_ids {:?} must be batteries.", ids);
tracing::error!("{e}");
return Err(Error::invalid_component(e));
}
}
Ok(this)
}
fn get_all_battery_ids(&self) -> BTreeSet<u64> {
self.logical_meter
.graph()
.components()
.filter(|c| c.category() == ElectricalComponentCategory::Battery)
.map(|c| c.id)
.collect()
}
pub(crate) fn get_battery_ids(&self) -> BTreeSet<u64> {
if let Some(ids) = &self.component_ids {
ids.clone()
} else {
self.get_all_battery_ids()
}
}
pub fn power(&mut self) -> Result<Formula<Power>, Error> {
self.logical_meter
.battery::<metric::AcPowerActive>(self.component_ids.clone())
}
pub fn power_bounds(&mut self) -> broadcast::Receiver<Vec<Bounds<Power>>> {
if let Some(tx) = self
.bounds_tx
.as_ref()
.and_then(broadcast::WeakSender::upgrade)
&& tx.receiver_count() > 0
{
return tx.subscribe();
}
let snapshot_rx = self.telemetry_snapshots();
let (tx, rx) = broadcast::channel(100);
self.bounds_tx = Some(tx.downgrade());
let tracker = BatteryPoolBoundsTracker::<metric::AcPowerActive, metric::DcPower>::new(
snapshot_rx,
tx,
);
tokio::spawn(tracker.run());
rx
}
pub(crate) fn telemetry_snapshots(&mut self) -> broadcast::Receiver<BatteryPoolSnapshot> {
if let Some(tx) = self
.snapshot_tx
.as_ref()
.and_then(broadcast::WeakSender::upgrade)
&& tx.receiver_count() > 0
{
return tx.subscribe();
}
let (tx, rx) = broadcast::channel(100);
self.snapshot_tx = Some(tx.downgrade());
let tracker = BatteryPoolTelemetryTracker::new(
self.get_battery_ids(),
Duration::from_secs(10),
HashSet::from([
ElectricalComponentStateCode::Ready,
ElectricalComponentStateCode::Standby,
ElectricalComponentStateCode::Charging,
ElectricalComponentStateCode::Discharging,
ElectricalComponentStateCode::RelayClosed,
]),
self.client.clone(),
self.logical_meter.clone(),
tx,
);
tokio::spawn(tracker.run());
rx
}
}