Skip to main content

frequenz_microgrid/microgrid/telemetry_tracker/
pv_pool_telemetry_tracker.rs

1// License: MIT
2// Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3
4//! A telemetry tracker for a pool of PV inverters.
5//!
6//! The tracker spawns a [`ComponentTelemetryTracker`] per inverter and emits a
7//! [`PvPoolSnapshot`], partitioning the inverters into healthy and unhealthy
8//! sets, whenever any inverter's telemetry or health classification changes.
9
10use std::{
11    collections::{BTreeSet, HashMap, HashSet},
12    time::Duration,
13};
14
15use tokio::sync::{broadcast, mpsc};
16
17use crate::{
18    Error, MicrogridClientHandle,
19    client::proto::common::microgrid::electrical_components::{
20        ElectricalComponentStateCode, ElectricalComponentTelemetry,
21    },
22};
23
24use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker};
25
26/// A snapshot of a PV pool's inverters, partitioned by health status and
27/// annotated with the latest telemetry sample for each.
28///
29/// `healthy_inverters` holds the most recent [`ElectricalComponentTelemetry`]
30/// observed for each healthy inverter. `unhealthy_inverters` holds the last
31/// telemetry observed before the inverter became unhealthy, or `None` if no
32/// sample has been received yet. Consumers can use the telemetry (including
33/// per-metric bounds) directly without subscribing to the raw streams again.
34#[derive(Clone, Debug, PartialEq)]
35pub struct PvPoolSnapshot {
36    pub healthy_inverters: HashMap<u64, ElectricalComponentTelemetry>,
37    pub unhealthy_inverters: HashMap<u64, Option<ElectricalComponentTelemetry>>,
38}
39
40/// A tracker that watches every PV inverter in the pool and emits a
41/// [`PvPoolSnapshot`] whenever any inverter's telemetry or health
42/// classification changes.
43#[derive(Clone)]
44pub struct PvPoolTelemetryTracker {
45    component_ids: BTreeSet<u64>,
46    component_pool_status_tx: broadcast::Sender<PvPoolSnapshot>,
47    missing_data_tolerance: Duration,
48    healthy_state_codes: HashSet<ElectricalComponentStateCode>,
49    client: MicrogridClientHandle,
50}
51
52impl PvPoolTelemetryTracker {
53    pub(crate) fn new(
54        component_ids: BTreeSet<u64>,
55        missing_data_tolerance: Duration,
56        healthy_state_codes: HashSet<ElectricalComponentStateCode>,
57        client: MicrogridClientHandle,
58        component_pool_status_tx: broadcast::Sender<PvPoolSnapshot>,
59    ) -> Self {
60        Self {
61            component_ids,
62            component_pool_status_tx,
63            missing_data_tolerance,
64            healthy_state_codes,
65            client,
66        }
67    }
68
69    pub async fn run(self) -> Result<(), Error> {
70        if self.component_ids.is_empty() {
71            let e = "No component IDs provided for PvPoolTelemetryTracker".to_string();
72            tracing::error!("{}", e);
73            return Err(Error::component_data_error(e));
74        }
75
76        let mut healthy_inverters: HashMap<u64, ElectricalComponentTelemetry> = HashMap::new();
77        let mut unhealthy_inverters: HashMap<u64, Option<ElectricalComponentTelemetry>> =
78            HashMap::new();
79
80        let (status_tx, mut status_rx) = mpsc::channel(100);
81        for &inverter_id in &self.component_ids {
82            let component_data_stream = self
83                .client
84                .receive_electrical_component_telemetry_stream(inverter_id)
85                .await?;
86            let tracker = ComponentTelemetryTracker::new(
87                inverter_id,
88                self.missing_data_tolerance,
89                self.healthy_state_codes.clone(),
90                component_data_stream,
91                status_tx.clone(),
92            );
93            // Spawn a task for each component telemetry tracker.
94            tokio::spawn(async move {
95                tracker.run().await;
96            });
97            // Initially mark the inverter as unhealthy until we see data.
98            unhealthy_inverters.insert(inverter_id, None);
99        }
100
101        // Drop the original sender so the channel closes once every component
102        // tracker finishes, which signals the main loop to stop.
103        drop(status_tx);
104
105        let mut interval = tokio::time::interval(Duration::from_millis(200));
106        let mut last_sent: Option<PvPoolSnapshot> = None;
107
108        loop {
109            tokio::select! {
110                maybe_status = status_rx.recv() => {
111                    match maybe_status {
112                        Some(ComponentHealthStatus::Healthy(id, data)) => {
113                            healthy_inverters.insert(id, data);
114                            unhealthy_inverters.remove(&id);
115                        }
116                        Some(ComponentHealthStatus::Unhealthy(id, data)) => {
117                            unhealthy_inverters.insert(id, data);
118                            healthy_inverters.remove(&id);
119                        }
120                        // Every component tracker has exited and dropped its
121                        // sender, so no further updates will arrive. The
122                        // `interval.tick()` arm never disables, so the `select!`
123                        // `else` can never run; break here instead.
124                        None => break,
125                    }
126                },
127                _ = interval.tick() => {
128                    // Skip sending if the partitioning hasn't changed.
129                    let unchanged = last_sent.as_ref().is_some_and(|s| {
130                        s.healthy_inverters == healthy_inverters
131                            && s.unhealthy_inverters == unhealthy_inverters
132                    });
133                    if unchanged {
134                        continue;
135                    }
136                    let snapshot = PvPoolSnapshot {
137                        healthy_inverters: healthy_inverters.clone(),
138                        unhealthy_inverters: unhealthy_inverters.clone(),
139                    };
140                    if let Err(e) = self.component_pool_status_tx.send(snapshot.clone()) {
141                        tracing::error!("Failed to send PV pool snapshot: {}", e);
142                        break;
143                    }
144                    last_sent = Some(snapshot);
145                },
146            }
147        }
148
149        let err = format!(
150            "PvPoolTelemetryTracker (component IDs {:?}) stopped receiving inverter telemetry updates.",
151            self.component_ids
152        );
153        tracing::error!("{}", err);
154        Err(Error::component_data_error(err))
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use chrono::TimeDelta;
161
162    use super::PvPoolSnapshot;
163    use crate::{
164        LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle,
165        client::{
166            proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
167            test_utils::{MockComponent, MockMicrogridApiClient},
168        },
169        microgrid::pv_pool::PvPool,
170    };
171
172    async fn new_pool(graph: MockComponent) -> PvPool {
173        let api = MockMicrogridApiClient::new(graph);
174        let client = MicrogridClientHandle::new_from_client(api);
175        let lm = LogicalMeterHandle::try_new(
176            client.clone(),
177            LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
178        )
179        .await
180        .unwrap();
181        PvPool::try_new(None, client, lm).unwrap()
182    }
183
184    /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the
185    /// last snapshot seen.
186    async fn last_snapshot(
187        rx: &mut tokio::sync::broadcast::Receiver<PvPoolSnapshot>,
188        steps: u32,
189    ) -> PvPoolSnapshot {
190        let mut last = None;
191        for _ in 0..steps {
192            tokio::time::advance(std::time::Duration::from_millis(100)).await;
193            while let Ok(snap) = rx.try_recv() {
194                last = Some(snap);
195            }
196        }
197        last.expect("no snapshot received")
198    }
199
200    #[tokio::test(start_paused = true)]
201    async fn single_inverter_reaches_healthy_state() {
202        // grid → meter → pv_inverter(3)
203        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
204            MockComponent::meter(2).with_children(vec![
205                MockComponent::pv_inverter(3).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
206            ]),
207        ]))
208        .await;
209
210        let mut rx = pool.telemetry_snapshots();
211        let snap = last_snapshot(&mut rx, 10).await;
212
213        assert!(snap.healthy_inverters.contains_key(&3));
214        assert!(snap.unhealthy_inverters.is_empty());
215    }
216
217    #[tokio::test(start_paused = true)]
218    async fn two_inverters_both_appear_in_snapshot() {
219        // grid → meter → [pv_inverter(3), pv_inverter(4)]
220        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
221            MockComponent::meter(2).with_children(vec![
222                MockComponent::pv_inverter(3).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
223                MockComponent::pv_inverter(4).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
224            ]),
225        ]))
226        .await;
227
228        let mut rx = pool.telemetry_snapshots();
229        let snap = last_snapshot(&mut rx, 10).await;
230
231        assert!(snap.healthy_inverters.contains_key(&3));
232        assert!(snap.healthy_inverters.contains_key(&4));
233        assert!(snap.unhealthy_inverters.is_empty());
234    }
235
236    #[tokio::test(start_paused = true)]
237    async fn calling_telemetry_snapshots_twice_reuses_sender() {
238        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
239            MockComponent::meter(2).with_children(vec![
240                MockComponent::pv_inverter(3).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
241            ]),
242        ]))
243        .await;
244
245        let mut rx1 = pool.telemetry_snapshots();
246        let mut rx2 = pool.telemetry_snapshots();
247
248        // Advance so both receivers see at least one snapshot.
249        tokio::time::advance(std::time::Duration::from_millis(300)).await;
250
251        let snap1 = rx1.recv().await.unwrap();
252        let snap2 = rx2.recv().await.unwrap();
253        assert_eq!(
254            snap1, snap2,
255            "both subscriptions should observe the same snapshot"
256        );
257    }
258
259    #[tokio::test(start_paused = true)]
260    async fn inverter_becomes_unhealthy_when_data_stops() {
261        // A handful of samples then silence; the stream stays open so the
262        // client actor doesn't reconnect and resupply data.
263        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
264            MockComponent::meter(2).with_children(vec![
265                MockComponent::pv_inverter(3)
266                    .with_power(vec![0.0, 0.0, 0.0])
267                    .with_silence_after_metrics(),
268            ]),
269        ]))
270        .await;
271
272        let mut rx = pool.telemetry_snapshots();
273
274        // First confirm the inverter reaches a healthy state.
275        let healthy = last_snapshot(&mut rx, 10).await;
276        assert!(
277            healthy.healthy_inverters.contains_key(&3),
278            "expected inverter to go healthy after initial samples, got {:?}",
279            healthy
280        );
281
282        // Advance well past the 10s missing-data tolerance — the component
283        // tracker should fire its interval and reclassify the inverter.
284        tokio::time::advance(std::time::Duration::from_secs(15)).await;
285        let unhealthy = last_snapshot(&mut rx, 5).await;
286
287        assert!(
288            unhealthy.healthy_inverters.is_empty(),
289            "inverter should be unhealthy after data stops, got healthy set {:?}",
290            unhealthy.healthy_inverters.keys()
291        );
292        assert!(unhealthy.unhealthy_inverters.contains_key(&3));
293    }
294
295    #[tokio::test(start_paused = true)]
296    async fn inverter_with_bad_state_is_unhealthy() {
297        // Inverter reports an Error state — it must land in the unhealthy set
298        // even though samples keep arriving.
299        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
300            MockComponent::meter(2).with_children(vec![
301                MockComponent::pv_inverter(3)
302                    .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
303                    .with_state(ElectricalComponentStateCode::Error),
304            ]),
305        ]))
306        .await;
307
308        let mut rx = pool.telemetry_snapshots();
309        let snap = last_snapshot(&mut rx, 10).await;
310
311        assert!(
312            !snap.healthy_inverters.contains_key(&3),
313            "inverter with Error state should not be in healthy set"
314        );
315        assert!(
316            snap.unhealthy_inverters.contains_key(&3),
317            "inverter with Error state should be in unhealthy set, got {:?}",
318            snap
319        );
320    }
321}