use std::marker::PhantomData;
use tokio::sync::broadcast;
use crate::client::proto::common::metrics::Bounds as PbBounds;
use crate::microgrid::bounds_aggregation::aggregate_parallel;
use crate::microgrid::telemetry_tracker::pv_pool_telemetry_tracker::PvPoolSnapshot;
use crate::{Bounds, metric::Metric};
pub(crate) struct PvPoolBoundsTracker<M: Metric> {
pool_status_rx: broadcast::Receiver<PvPoolSnapshot>,
pool_bounds_tx: broadcast::Sender<Vec<Bounds<M::QuantityType>>>,
_marker: PhantomData<M>,
}
impl<M> PvPoolBoundsTracker<M>
where
M: Metric,
Bounds<M::QuantityType>: From<PbBounds>,
{
pub(crate) fn new(
pool_status_rx: broadcast::Receiver<PvPoolSnapshot>,
pool_bounds_tx: broadcast::Sender<Vec<Bounds<M::QuantityType>>>,
) -> Self {
Self {
pool_status_rx,
pool_bounds_tx,
_marker: PhantomData,
}
}
pub(crate) async fn run(mut self) {
loop {
match self.pool_status_rx.recv().await {
Ok(pool_status) => {
let bounds = Self::compute_pool_bounds(&pool_status);
if self.pool_bounds_tx.send(bounds).is_err() {
tracing::debug!(
"No receivers for {} PV bounds tracker; shutting down.",
M::str_name(),
);
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
"{} PV bounds tracker lagged by {n} pool status updates.",
M::str_name(),
);
}
Err(broadcast::error::RecvError::Closed) => {
tracing::error!(
"Pool status channel closed; {} PV bounds tracker shutting down.",
M::str_name(),
);
break;
}
}
}
}
fn compute_pool_bounds(status: &PvPoolSnapshot) -> Vec<Bounds<M::QuantityType>> {
aggregate_parallel::<M>(&status.healthy_inverters)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::Bounds;
use crate::client::proto::common::metrics::{
Bounds as PbBounds, Metric as MetricPb, MetricSample,
};
use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry;
use crate::metric::AcPowerActive;
use crate::microgrid::telemetry_tracker::pv_pool_telemetry_tracker::PvPoolSnapshot;
use crate::quantity::Power;
use super::PvPoolBoundsTracker;
fn telem_with_power_bounds(
id: u64,
bounds: Vec<(Option<f32>, Option<f32>)>,
) -> ElectricalComponentTelemetry {
ElectricalComponentTelemetry {
electrical_component_id: id,
metric_samples: vec![MetricSample {
sample_time: None,
metric: MetricPb::AcPowerActive as i32,
value: None,
bounds: bounds
.into_iter()
.map(|(lower, upper)| PbBounds { lower, upper })
.collect(),
..Default::default()
}],
..Default::default()
}
}
fn healthy_snapshot(healthy: Vec<ElectricalComponentTelemetry>) -> PvPoolSnapshot {
let healthy = healthy
.into_iter()
.map(|t| (t.electrical_component_id, t))
.collect();
PvPoolSnapshot {
healthy_inverters: healthy,
unhealthy_inverters: HashMap::new(),
}
}
#[test]
fn single_inverter_uses_its_bounds() {
let snap = healthy_snapshot(vec![telem_with_power_bounds(
10,
vec![(Some(-1000.0), Some(0.0))],
)]);
let bounds = PvPoolBoundsTracker::<AcPowerActive>::compute_pool_bounds(&snap);
assert_eq!(
bounds,
vec![Bounds::new(
Some(Power::from_watts(-1000.0)),
Some(Power::from_watts(0.0))
)]
);
}
#[test]
fn parallel_inverters_add() {
let snap = healthy_snapshot(vec![
telem_with_power_bounds(10, vec![(Some(-1000.0), Some(0.0))]),
telem_with_power_bounds(11, vec![(Some(-2000.0), Some(0.0))]),
]);
let bounds = PvPoolBoundsTracker::<AcPowerActive>::compute_pool_bounds(&snap);
assert_eq!(
bounds,
vec![Bounds::new(
Some(Power::from_watts(-3000.0)),
Some(Power::from_watts(0.0))
)]
);
}
#[test]
fn empty_pool_yields_empty_bounds() {
let snap = healthy_snapshot(vec![]);
let bounds = PvPoolBoundsTracker::<AcPowerActive>::compute_pool_bounds(&snap);
assert!(bounds.is_empty());
}
#[test]
fn unhealthy_inverters_are_excluded() {
let healthy = [telem_with_power_bounds(
10,
vec![(Some(-1000.0), Some(0.0))],
)]
.into_iter()
.map(|t| (t.electrical_component_id, t))
.collect();
let mut unhealthy = HashMap::new();
unhealthy.insert(
11,
Some(telem_with_power_bounds(
11,
vec![(Some(-9000.0), Some(0.0))],
)),
);
let snap = PvPoolSnapshot {
healthy_inverters: healthy,
unhealthy_inverters: unhealthy,
};
let bounds = PvPoolBoundsTracker::<AcPowerActive>::compute_pool_bounds(&snap);
assert_eq!(
bounds,
vec![Bounds::new(
Some(Power::from_watts(-1000.0)),
Some(Power::from_watts(0.0))
)]
);
}
#[test]
fn inverter_without_matching_metric_contributes_nothing() {
let other = ElectricalComponentTelemetry {
electrical_component_id: 10,
metric_samples: vec![MetricSample {
sample_time: None,
metric: MetricPb::AcVoltage as i32,
value: None,
bounds: vec![PbBounds {
lower: Some(0.0),
upper: Some(1.0),
}],
..Default::default()
}],
..Default::default()
};
let snap = healthy_snapshot(vec![other]);
let bounds = PvPoolBoundsTracker::<AcPowerActive>::compute_pool_bounds(&snap);
assert!(bounds.is_empty());
}
}