Skip to main content

frequenz_microgrid/microgrid/
pv_pool.rs

1// License: MIT
2// Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3
4//! Representation of a pool of PV inverters in the microgrid.
5//!
6//! A [`PvPool`] aggregates a set of PV inverters — either an explicit subset or
7//! every PV inverter in the microgrid — and exposes their combined active
8//! power, their aggregated active-power bounds, and a health-partitioned
9//! telemetry snapshot stream.
10//!
11//! Obtain one from [`Microgrid::pv_pool`]; see [`PvPool`] for a usage example.
12//!
13//! [`Microgrid::pv_pool`]: crate::Microgrid::pv_pool
14
15use tokio::sync::broadcast;
16
17use std::collections::{BTreeSet, HashSet};
18use std::time::Duration;
19
20use crate::{
21    Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle,
22    client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
23    metric,
24    microgrid::{
25        pv_bounds_tracker::PvPoolBoundsTracker,
26        telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker},
27    },
28    quantity::Power,
29};
30
31/// A pool of PV inverters in the microgrid.
32///
33/// Created with [`Microgrid::pv_pool`][mg], passing either an explicit set of PV
34/// inverter component IDs or `None` to cover every PV inverter in the microgrid.
35/// It exposes:
36///
37/// - [`power`](Self::power) — a [`Formula`] for the pool's aggregate active
38///   power;
39/// - [`power_bounds`](Self::power_bounds) — a stream of the pool's aggregated
40///   active-power bounds;
41/// - [`telemetry_snapshots`](Self::telemetry_snapshots) — a stream of
42///   [`PvPoolSnapshot`]s partitioning the inverters into healthy and unhealthy
43///   sets.
44///
45/// The bounds and snapshot streams share a telemetry tracker that is started on
46/// first use and reused while it still has live receivers.
47///
48/// # Example
49///
50/// ```no_run
51/// # async fn example() -> Result<(), frequenz_microgrid::Error> {
52/// use chrono::TimeDelta;
53/// use frequenz_microgrid::{LogicalMeterConfig, Microgrid};
54///
55/// let microgrid = Microgrid::try_new(
56///     "grpc://localhost:50051",
57///     LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
58/// )
59/// .await?;
60///
61/// // A pool over every PV inverter in the microgrid.
62/// let mut pv_pool = microgrid.pv_pool(None)?;
63///
64/// // Subscribe to the pool's aggregated active-power bounds.
65/// let mut bounds_rx = pv_pool.power_bounds();
66/// while let Ok(bounds) = bounds_rx.recv().await {
67///     println!("PV pool active-power bounds: {bounds:?}");
68/// }
69/// # Ok(())
70/// # }
71/// ```
72///
73/// [mg]: crate::Microgrid::pv_pool
74pub struct PvPool {
75    component_ids: Option<BTreeSet<u64>>,
76    client: MicrogridClientHandle,
77    logical_meter: LogicalMeterHandle,
78    snapshot_tx: Option<broadcast::WeakSender<PvPoolSnapshot>>,
79    bounds_tx: Option<broadcast::WeakSender<Vec<Bounds<Power>>>>,
80}
81
82impl PvPool {
83    /// Creates a new `PvPool` instance with the given component IDs, client and
84    /// logical meter handles.
85    ///
86    /// When `component_ids` is `Some`, every ID must refer to a PV inverter in
87    /// the component graph; otherwise an error is returned. When it is `None`,
88    /// the pool covers all PV inverters in the microgrid.
89    pub(crate) fn try_new(
90        component_ids: Option<BTreeSet<u64>>,
91        client: MicrogridClientHandle,
92        logical_meter: LogicalMeterHandle,
93    ) -> Result<Self, Error> {
94        let this = Self {
95            component_ids,
96            client,
97            logical_meter,
98            snapshot_tx: None,
99            bounds_tx: None,
100        };
101        if let Some(ids) = &this.component_ids {
102            if ids.is_empty() {
103                let e = "component_ids cannot be an empty set".to_string();
104                tracing::error!("{e}");
105                return Err(Error::invalid_component(e));
106            }
107            // Validate that all provided IDs correspond to PV inverters in the
108            // graph.
109            if !ids.is_subset(&this.get_all_pv_inverter_ids()) {
110                let e = format!("All component_ids {:?} must be PV inverters.", ids);
111                tracing::error!("{e}");
112                return Err(Error::invalid_component(e));
113            }
114        }
115        Ok(this)
116    }
117
118    fn get_all_pv_inverter_ids(&self) -> BTreeSet<u64> {
119        self.logical_meter
120            .graph()
121            .components()
122            .filter(|c| c.is_pv_inverter())
123            .map(|c| c.id)
124            .collect()
125    }
126
127    pub(crate) fn get_pv_inverter_ids(&self) -> BTreeSet<u64> {
128        if let Some(ids) = &self.component_ids {
129            ids.clone()
130        } else {
131            self.get_all_pv_inverter_ids()
132        }
133    }
134
135    /// Returns a formula for the active power of the PV pool.
136    pub fn power(&mut self) -> Result<Formula<Power>, Error> {
137        self.logical_meter
138            .pv::<metric::AcPowerActive>(self.component_ids.clone())
139    }
140
141    /// Returns a receiver for the aggregated active-power bounds of the pool,
142    /// updated on each snapshot.
143    ///
144    /// Reuses the running bounds tracker if one exists and still has active
145    /// receivers; otherwise starts a new one (which also starts or reuses the
146    /// underlying telemetry tracker).
147    pub fn power_bounds(&mut self) -> broadcast::Receiver<Vec<Bounds<Power>>> {
148        if let Some(tx) = self
149            .bounds_tx
150            .as_ref()
151            .and_then(broadcast::WeakSender::upgrade)
152            && tx.receiver_count() > 0
153        {
154            return tx.subscribe();
155        }
156        let snapshot_rx = self.telemetry_snapshots();
157        let (tx, rx) = broadcast::channel(100);
158        self.bounds_tx = Some(tx.downgrade());
159        let tracker = PvPoolBoundsTracker::<metric::AcPowerActive>::new(snapshot_rx, tx);
160        tokio::spawn(tracker.run());
161        rx
162    }
163
164    /// Returns a receiver for a stream of [`PvPoolSnapshot`] values, each
165    /// reflecting the latest inverter telemetry partitioned into healthy and
166    /// unhealthy sets.
167    ///
168    /// Reuses the running tracker if one exists and still has active receivers
169    /// (including any held by a bounds tracker); otherwise starts a new one.
170    pub fn telemetry_snapshots(&mut self) -> broadcast::Receiver<PvPoolSnapshot> {
171        if let Some(tx) = self
172            .snapshot_tx
173            .as_ref()
174            .and_then(broadcast::WeakSender::upgrade)
175            && tx.receiver_count() > 0
176        {
177            return tx.subscribe();
178        }
179        let (tx, rx) = broadcast::channel(100);
180        self.snapshot_tx = Some(tx.downgrade());
181        let tracker = PvPoolTelemetryTracker::new(
182            self.get_pv_inverter_ids(),
183            Duration::from_secs(10),
184            // Operational states in which a PV inverter is alive and
185            // reporting usable telemetry: producing (Discharging), or idle
186            // and ready (Ready / Standby).
187            HashSet::from([
188                ElectricalComponentStateCode::Ready,
189                ElectricalComponentStateCode::Standby,
190                ElectricalComponentStateCode::Discharging,
191            ]),
192            self.client.clone(),
193            tx,
194        );
195        tokio::spawn(tracker.run());
196        rx
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use std::collections::BTreeSet;
203
204    use chrono::TimeDelta;
205
206    use super::PvPool;
207    use crate::{
208        LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle,
209        client::test_utils::{MockComponent, MockMicrogridApiClient},
210    };
211
212    /// Builds client and logical-meter handles backed by the given mock graph.
213    async fn handles(graph: MockComponent) -> (MicrogridClientHandle, LogicalMeterHandle) {
214        let api = MockMicrogridApiClient::new(graph);
215        let client = MicrogridClientHandle::new_from_client(api);
216        let lm = LogicalMeterHandle::try_new(
217            client.clone(),
218            LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
219        )
220        .await
221        .unwrap();
222        (client, lm)
223    }
224
225    /// grid → meter → [pv meter → pv_inverter(4), pv_inverter(5)],
226    ///                 [battery meter → battery_inverter(7) → battery(8)]
227    fn graph() -> MockComponent {
228        MockComponent::grid(1).with_children(vec![MockComponent::meter(2).with_children(vec![
229            MockComponent::meter(3).with_children(vec![
230                MockComponent::pv_inverter(4),
231                MockComponent::pv_inverter(5),
232            ]),
233            MockComponent::meter(6).with_children(vec![
234                MockComponent::battery_inverter(7).with_children(vec![MockComponent::battery(8)]),
235            ]),
236        ])])
237    }
238
239    #[tokio::test]
240    async fn try_new_rejects_empty_component_ids() {
241        let (client, lm) = handles(graph()).await;
242        let err = PvPool::try_new(Some(BTreeSet::new()), client, lm)
243            .err()
244            .expect("empty component_ids should be rejected");
245        assert!(err.to_string().contains("empty"), "unexpected error: {err}");
246    }
247
248    #[tokio::test]
249    async fn try_new_rejects_non_pv_component_ids() {
250        let (client, lm) = handles(graph()).await;
251        // 7 is a battery inverter and 8 a battery — neither is a PV inverter.
252        let err = PvPool::try_new(Some([4, 7, 8].into()), client, lm)
253            .err()
254            .expect("non-PV component_ids should be rejected");
255        assert!(
256            err.to_string().contains("must be PV inverters"),
257            "unexpected error: {err}"
258        );
259    }
260
261    #[tokio::test]
262    async fn power_formula_for_explicit_pv_inverters() {
263        let (client, lm) = handles(graph()).await;
264        let mut pool = PvPool::try_new(Some([4, 5].into()), client, lm).unwrap();
265        let formula = pool.power().unwrap();
266        assert_eq!(
267            formula.to_string(),
268            "METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))"
269        );
270    }
271
272    #[tokio::test]
273    async fn power_formula_for_all_pv_inverters() {
274        let (client, lm) = handles(graph()).await;
275        let mut pool = PvPool::try_new(None, client, lm).unwrap();
276        let formula = pool.power().unwrap();
277        assert_eq!(
278            formula.to_string(),
279            "METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))"
280        );
281    }
282}