use tokio::sync::broadcast;
use std::collections::{BTreeSet, HashSet};
use std::time::Duration;
use crate::{
Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle,
client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
metric,
microgrid::{
pv_bounds_tracker::PvPoolBoundsTracker,
telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker},
},
quantity::Power,
};
pub struct PvPool {
component_ids: Option<BTreeSet<u64>>,
client: MicrogridClientHandle,
logical_meter: LogicalMeterHandle,
snapshot_tx: Option<broadcast::WeakSender<PvPoolSnapshot>>,
bounds_tx: Option<broadcast::WeakSender<Vec<Bounds<Power>>>>,
}
impl PvPool {
pub(crate) fn try_new(
component_ids: Option<BTreeSet<u64>>,
client: MicrogridClientHandle,
logical_meter: LogicalMeterHandle,
) -> Result<Self, Error> {
let this = Self {
component_ids,
client,
logical_meter,
snapshot_tx: None,
bounds_tx: None,
};
if let Some(ids) = &this.component_ids {
if ids.is_empty() {
let e = "component_ids cannot be an empty set".to_string();
tracing::error!("{e}");
return Err(Error::invalid_component(e));
}
if !ids.is_subset(&this.get_all_pv_inverter_ids()) {
let e = format!("All component_ids {:?} must be PV inverters.", ids);
tracing::error!("{e}");
return Err(Error::invalid_component(e));
}
}
Ok(this)
}
fn get_all_pv_inverter_ids(&self) -> BTreeSet<u64> {
self.logical_meter
.graph()
.components()
.filter(|c| c.is_pv_inverter())
.map(|c| c.id)
.collect()
}
pub(crate) fn get_pv_inverter_ids(&self) -> BTreeSet<u64> {
if let Some(ids) = &self.component_ids {
ids.clone()
} else {
self.get_all_pv_inverter_ids()
}
}
pub fn power(&mut self) -> Result<Formula<Power>, Error> {
self.logical_meter
.pv::<metric::AcPowerActive>(self.component_ids.clone())
}
pub fn power_bounds(&mut self) -> broadcast::Receiver<Vec<Bounds<Power>>> {
if let Some(tx) = self
.bounds_tx
.as_ref()
.and_then(broadcast::WeakSender::upgrade)
&& tx.receiver_count() > 0
{
return tx.subscribe();
}
let snapshot_rx = self.telemetry_snapshots();
let (tx, rx) = broadcast::channel(100);
self.bounds_tx = Some(tx.downgrade());
let tracker = PvPoolBoundsTracker::<metric::AcPowerActive>::new(snapshot_rx, tx);
tokio::spawn(tracker.run());
rx
}
pub fn telemetry_snapshots(&mut self) -> broadcast::Receiver<PvPoolSnapshot> {
if let Some(tx) = self
.snapshot_tx
.as_ref()
.and_then(broadcast::WeakSender::upgrade)
&& tx.receiver_count() > 0
{
return tx.subscribe();
}
let (tx, rx) = broadcast::channel(100);
self.snapshot_tx = Some(tx.downgrade());
let tracker = PvPoolTelemetryTracker::new(
self.get_pv_inverter_ids(),
Duration::from_secs(10),
HashSet::from([
ElectricalComponentStateCode::Ready,
ElectricalComponentStateCode::Standby,
ElectricalComponentStateCode::Discharging,
]),
self.client.clone(),
tx,
);
tokio::spawn(tracker.run());
rx
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use chrono::TimeDelta;
use super::PvPool;
use crate::{
LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle,
client::test_utils::{MockComponent, MockMicrogridApiClient},
};
async fn handles(graph: MockComponent) -> (MicrogridClientHandle, LogicalMeterHandle) {
let api = MockMicrogridApiClient::new(graph);
let client = MicrogridClientHandle::new_from_client(api);
let lm = LogicalMeterHandle::try_new(
client.clone(),
LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
)
.await
.unwrap();
(client, lm)
}
fn graph() -> MockComponent {
MockComponent::grid(1).with_children(vec![MockComponent::meter(2).with_children(vec![
MockComponent::meter(3).with_children(vec![
MockComponent::pv_inverter(4),
MockComponent::pv_inverter(5),
]),
MockComponent::meter(6).with_children(vec![
MockComponent::battery_inverter(7).with_children(vec![MockComponent::battery(8)]),
]),
])])
}
#[tokio::test]
async fn try_new_rejects_empty_component_ids() {
let (client, lm) = handles(graph()).await;
let err = PvPool::try_new(Some(BTreeSet::new()), client, lm)
.err()
.expect("empty component_ids should be rejected");
assert!(err.to_string().contains("empty"), "unexpected error: {err}");
}
#[tokio::test]
async fn try_new_rejects_non_pv_component_ids() {
let (client, lm) = handles(graph()).await;
let err = PvPool::try_new(Some([4, 7, 8].into()), client, lm)
.err()
.expect("non-PV component_ids should be rejected");
assert!(
err.to_string().contains("must be PV inverters"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn power_formula_for_explicit_pv_inverters() {
let (client, lm) = handles(graph()).await;
let mut pool = PvPool::try_new(Some([4, 5].into()), client, lm).unwrap();
let formula = pool.power().unwrap();
assert_eq!(
formula.to_string(),
"METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))"
);
}
#[tokio::test]
async fn power_formula_for_all_pv_inverters() {
let (client, lm) = handles(graph()).await;
let mut pool = PvPool::try_new(None, client, lm).unwrap();
let formula = pool.power().unwrap();
assert_eq!(
formula.to_string(),
"METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))"
);
}
}